diff --git a/Dockerfile_stock b/Dockerfile_stock new file mode 100644 index 000000000..f6f329454 --- /dev/null +++ b/Dockerfile_stock @@ -0,0 +1,3 @@ +# builds an image of standard airflow, that includes workflows in ./workflows/image/airflow-dags +FROM apache/airflow:2.4.3 +COPY workflows/image/airflow-dags /opt/airflow/dags diff --git a/README.md b/README.md index 9cbffe695..7455e64c7 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ cd .. Now, the Kubernetes cluster and Knative should be ready. It is time to deploy this fork of airflow with the following commands: ```bash -git clone --single-branch --branch integrate-knative --depth 1 git@github.com:eth-easl/airflow.git +git clone --depth 1 git@github.com:eth-easl/airflow.git cd airflow ./scripts/setup_airflow.sh ``` @@ -184,3 +184,20 @@ With the pod id, run ```bash kubectl -n airflow logs ``` + +## Running benchmarks +Run `./scripts/run_benchmarks` to run the benchmark workflows. +It will delete the kubernetes namespace `airflow`, so make sure there is nothing important +in it. +The script will run the benchmark workflows against the modified version of Airflow with Knative +and against an unmodified, stock version of Airflow. + +Existing benchmark data is in `benchmark_data.tar.gz`. +Unpack it to `benchmark_data` to use the provided plotting and analysis scripts. +E.g. +```bash +tar -xvzf benchmark_data.tar.gz +python3 scripts/analyze_per_task_latency.py benchmark_data/benchmarking_logs_1676206749/log_scheduler_benchmark_w8_d3.txt +python3 scripts/plot_e2e_latency.py +python3 scripts/plot_throughput_width.py +``` diff --git a/benchmark_data.tar.gz b/benchmark_data.tar.gz new file mode 100644 index 000000000..df4dadb58 Binary files /dev/null and b/benchmark_data.tar.gz differ diff --git a/cluster_scripts/setup_single.sh b/cluster_scripts/setup_single.sh deleted file mode 100644 index faa3215d4..000000000 --- a/cluster_scripts/setup_single.sh +++ /dev/null @@ -1,24 +0,0 @@ -git clone --depth=1 https://github.com/vhive-serverless/vhive.git -cd vhive -mkdir -p /tmp/vhive-logs -./scripts/cloudlab/setup_node.sh > >(tee -a /tmp/vhive-logs/setup_node.stdout) 2> >(tee -a /tmp/vhive-logs/setup_node.stderr >&2) -./scripts/cloudlab/setup_node.sh; -sudo screen -dmS containerd containerd; sleep 5; -sudo PATH=$PATH screen -dmS firecracker /usr/local/bin/firecracker-containerd --config /etc/firecracker-containerd/config.toml; sleep 5; -source /etc/profile && go build; -sudo screen -dmS vhive ./vhive; sleep 5; -./scripts/cluster/create_one_node_cluster.sh - -curl https://baltocdn.com/helm/signing.asc | gpg --dearmor | sudo tee /usr/share/keyrings/helm.gpg > /dev/null -sudo apt-get install apt-transport-https --yes -echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/helm.gpg] https://baltocdn.com/helm/stable/debian/ all main" | sudo tee /etc/apt/sources.list.d/helm-stable-debian.list -sudo apt-get update -sudo apt-get install helm - -kubectl create namespace airflow -sudo mkdir /mnt/data{0..19} -sudo chmod 777 /mnt/data* -kubectl -n airflow create -f volumes.yaml -helm repo add apache-airflow https://airflow.apache.org -helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml --debug - diff --git a/cluster_scripts/setup_stock_single.sh b/cluster_scripts/setup_stock_single.sh deleted file mode 100755 index f7f50100c..000000000 --- a/cluster_scripts/setup_stock_single.sh +++ /dev/null @@ -1,14 +0,0 @@ -# cluster setup -git clone https://github.com/vhive-serverless/vhive -cd vhive -./scripts/cloudlab/setup_node.sh stock-only -sudo containerd -./scripts/cluster/create_one_node_cluster.sh stock-only - -# install helm -curl https://baltocdn.com/helm/signing.asc | gpg --dearmor | sudo tee /usr/share/keyrings/helm.gpg > /dev/null -sudo apt-get install apt-transport-https --yes -echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/helm.gpg] https://baltocdn.com/helm/stable/debian/ all main" | sudo tee /etc/apt/sources.list.d/helm-stable-debian.list -sudo apt-get update -sudo apt-get install helm - diff --git a/configs/values-stock.yaml b/configs/values-stock.yaml new file mode 100644 index 000000000..f3706e86a --- /dev/null +++ b/configs/values-stock.yaml @@ -0,0 +1,1856 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +# Default values for airflow. +# This is a YAML-formatted file. +# Declare variables to be passed into your templates. + +# Provide a name to substitute for the full names of resources +fullnameOverride: "" + +# Provide a name to substitute for the name of the chart +nameOverride: "" + +# Provide a Kubernetes version (used for API Version selection) to override the auto-detected version +kubeVersionOverride: "" + +# Max number of old replicasets to retain. Can be overridden by each deployment's revisionHistoryLimit +revisionHistoryLimit: ~ + +# User and group of airflow user +uid: 50000 +gid: 0 + +# Default security context for airflow +securityContext: {} +# runAsUser: 50000 +# fsGroup: 0 +# runAsGroup: 0 + +# Airflow home directory +# Used for mount paths +airflowHome: /opt/airflow + +# Default airflow repository -- overridden by all the specific images below +defaultAirflowRepository: apache/airflow + +# Default airflow tag to deploy +defaultAirflowTag: "2.4.1" + +# Airflow version (Used to make some decisions based on Airflow Version being deployed) +airflowVersion: "2.4.1" + +# Images +images: + airflow: + repository: ghcr.io/jonamuen/airflow-stock + tag: latest + pullPolicy: Always + # To avoid images with user code, you can turn this to 'true' and + # all the 'run-airflow-migrations' and 'wait-for-airflow-migrations' containers/jobs + # will use the images from 'defaultAirflowRepository:defaultAirflowTag' values + # to run and wait for DB migrations . + useDefaultImageForMigration: false + # timeout (in seconds) for airflow-migrations to complete + migrationsWaitTimeout: 60 + pod_template: + repository: ~ + tag: ~ + pullPolicy: IfNotPresent + flower: + repository: ~ + tag: ~ + pullPolicy: IfNotPresent + statsd: + repository: quay.io/prometheus/statsd-exporter + tag: v0.22.8 + pullPolicy: IfNotPresent + redis: + repository: redis + tag: 6-bullseye + pullPolicy: IfNotPresent + pgbouncer: + repository: apache/airflow + tag: airflow-pgbouncer-2021.04.28-1.14.0 + pullPolicy: IfNotPresent + pgbouncerExporter: + repository: apache/airflow + tag: airflow-pgbouncer-exporter-2021.09.22-0.12.0 + pullPolicy: IfNotPresent + gitSync: + repository: k8s.gcr.io/git-sync/git-sync + tag: v3.4.0 + pullPolicy: IfNotPresent + +# Select certain nodes for airflow pods. +nodeSelector: {} +affinity: {} +tolerations: [] +topologySpreadConstraints: [] + +# Add common labels to all objects and pods defined in this chart. +labels: {} + +# Ingress configuration +ingress: + # Enable all ingress resources (deprecated - use ingress.web.enabled and ingress.flower.enabled) + enabled: ~ + + # Configs for the Ingress of the web Service + web: + # Enable web ingress resource + enabled: false + + # Annotations for the web Ingress + annotations: {} + + # The path for the web Ingress + path: "/" + + # The pathType for the above path (used only with Kubernetes v1.19 and above) + pathType: "ImplementationSpecific" + + # The hostname for the web Ingress (Deprecated - renamed to `ingress.web.hosts`) + host: "" + + # The hostnames or hosts configuration for the web Ingress + hosts: [] + # - name: "" + # # configs for web Ingress TLS + # tls: + # # Enable TLS termination for the web Ingress + # enabled: false + # # the name of a pre-created Secret containing a TLS private key and certificate + # secretName: "" + + # The Ingress Class for the web Ingress (used only with Kubernetes v1.19 and above) + ingressClassName: "" + + # configs for web Ingress TLS (Deprecated - renamed to `ingress.web.hosts[*].tls`) + tls: + # Enable TLS termination for the web Ingress + enabled: false + # the name of a pre-created Secret containing a TLS private key and certificate + secretName: "" + + # HTTP paths to add to the web Ingress before the default path + precedingPaths: [] + + # Http paths to add to the web Ingress after the default path + succeedingPaths: [] + + # Configs for the Ingress of the flower Service + flower: + # Enable web ingress resource + enabled: false + + # Annotations for the flower Ingress + annotations: {} + + # The path for the flower Ingress + path: "/" + + # The pathType for the above path (used only with Kubernetes v1.19 and above) + pathType: "ImplementationSpecific" + + # The hostname for the flower Ingress (Deprecated - renamed to `ingress.flower.hosts`) + host: "" + + # The hostnames or hosts configuration for the flower Ingress + hosts: [] + # - name: "" + # tls: + # # Enable TLS termination for the flower Ingress + # enabled: false + # # the name of a pre-created Secret containing a TLS private key and certificate + # secretName: "" + + # The Ingress Class for the flower Ingress (used only with Kubernetes v1.19 and above) + ingressClassName: "" + + # configs for flower Ingress TLS (Deprecated - renamed to `ingress.flower.hosts[*].tls`) + tls: + # Enable TLS termination for the flower Ingress + enabled: false + # the name of a pre-created Secret containing a TLS private key and certificate + secretName: "" + +# Network policy configuration +networkPolicies: + # Enabled network policies + enabled: false + +# Extra annotations to apply to all +# Airflow pods +airflowPodAnnotations: {} + +# Extra annotations to apply to +# main Airflow configmap +airflowConfigAnnotations: {} + +# `airflow_local_settings` file as a string (can be templated). +airflowLocalSettings: |- + {{- if semverCompare ">=2.2.0" .Values.airflowVersion }} + {{- if not (or .Values.webserverSecretKey .Values.webserverSecretKeySecretName) }} + from airflow.www.utils import UIAlert + + DASHBOARD_UIALERTS = [ + UIAlert( + 'Usage of a dynamic webserver secret key detected. We recommend a static webserver secret key instead.' + ' See the ' + 'Helm Chart Production Guide for more details.', + category="warning", + roles=["Admin"], + html=True, + ) + ] + {{- end }} + {{- end }} + +# Enable RBAC (default on most clusters these days) +rbac: + # Specifies whether RBAC resources should be created + create: true + createSCCRoleBinding: false + +# Airflow executor +# One of: LocalExecutor, LocalKubernetesExecutor, CeleryExecutor, KubernetesExecutor, CeleryKubernetesExecutor +# executor: "CeleryExecutor" +executor: "KubernetesExecutor" + +# If this is true and using LocalExecutor/KubernetesExecutor/CeleryKubernetesExecutor, the scheduler's +# service account will have access to communicate with the api-server and launch pods. +# If this is true and using CeleryExecutor/KubernetesExecutor/CeleryKubernetesExecutor, the workers +# will be able to launch pods. +allowPodLaunching: true + +# Environment variables for all airflow containers +env: [] +# - name: "" +# value: "" + +# Secrets for all airflow containers +secret: [] +# - envName: "" +# secretName: "" +# secretKey: "" + +# Enables selected built-in secrets that are set via environment variables by default. +# Those secrets are provided by the Helm Chart secrets by default but in some cases you +# might want to provide some of those variables with _CMD or _SECRET variable, and you should +# in this case disable setting of those variables by setting the relevant configuration to false. +enableBuiltInSecretEnvVars: + AIRFLOW__CORE__FERNET_KEY: true + # For Airflow <2.3, backward compatibility; moved to [database] in 2.3 + AIRFLOW__CORE__SQL_ALCHEMY_CONN: true + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: true + AIRFLOW_CONN_AIRFLOW_DB: true + AIRFLOW__WEBSERVER__SECRET_KEY: true + AIRFLOW__CELERY__CELERY_RESULT_BACKEND: true + AIRFLOW__CELERY__RESULT_BACKEND: true + AIRFLOW__CELERY__BROKER_URL: true + AIRFLOW__ELASTICSEARCH__HOST: true + AIRFLOW__ELASTICSEARCH__ELASTICSEARCH_HOST: true + +# Extra secrets that will be managed by the chart +# (You can use them with extraEnv or extraEnvFrom or some of the extraVolumes values). +# The format for secret data is "key/value" where +# * key (can be templated) is the name of the secret that will be created +# * value: an object with the standard 'data' or 'stringData' key (or both). +# The value associated with those keys must be a string (can be templated) +extraSecrets: {} +# eg: +# extraSecrets: +# '{{ .Release.Name }}-airflow-connections': +# type: 'Opaque' +# labels: +# my.custom.label/v1: my_custom_label_value_1 +# data: | +# AIRFLOW_CONN_GCP: 'base64_encoded_gcp_conn_string' +# AIRFLOW_CONN_AWS: 'base64_encoded_aws_conn_string' +# stringData: | +# AIRFLOW_CONN_OTHER: 'other_conn' +# '{{ .Release.Name }}-other-secret-name-suffix': +# data: | +# ... + +# Extra ConfigMaps that will be managed by the chart +# (You can use them with extraEnv or extraEnvFrom or some of the extraVolumes values). +# The format for configmap data is "key/value" where +# * key (can be templated) is the name of the configmap that will be created +# * value: an object with the standard 'data' key. +# The value associated with this keys must be a string (can be templated) +extraConfigMaps: {} +# eg: +# extraConfigMaps: +# '{{ .Release.Name }}-airflow-variables': +# labels: +# my.custom.label/v2: my_custom_label_value_2 +# data: | +# AIRFLOW_VAR_HELLO_MESSAGE: "Hi!" +# AIRFLOW_VAR_KUBERNETES_NAMESPACE: "{{ .Release.Namespace }}" + +# Extra env 'items' that will be added to the definition of airflow containers +# a string is expected (can be templated). +# TODO: difference from `env`? This is a templated string. Probably should template `env` and remove this. +extraEnv: ~ +# eg: +# extraEnv: | +# - name: AIRFLOW__CORE__LOAD_EXAMPLES +# value: 'True' + +# Extra envFrom 'items' that will be added to the definition of airflow containers +# A string is expected (can be templated). +extraEnvFrom: ~ +# eg: +# extraEnvFrom: | +# - secretRef: +# name: '{{ .Release.Name }}-airflow-connections' +# - configMapRef: +# name: '{{ .Release.Name }}-airflow-variables' + +# Airflow database & redis config +data: + # If secret names are provided, use those secrets + # These secrets must be created manually, eg: + # + # kind: Secret + # apiVersion: v1 + # metadata: + # name: custom-airflow-metadata-secret + # type: Opaque + # data: + # connection: base64_encoded_connection_string + + metadataSecretName: ~ + # When providing secret names and using the same database for metadata and + # result backend, for Airflow < 2.4.0 it is necessary to create a separate + # secret for result backend but with a db+ scheme prefix. + # For Airflow >= 2.4.0 it is possible to not specify the secret again, + # as Airflow will use sql_alchemy_conn with a db+ scheme prefix by default. + resultBackendSecretName: ~ + brokerUrlSecretName: ~ + + # Otherwise pass connection values in + metadataConnection: + user: postgres + pass: postgres + protocol: postgresql + host: ~ + port: 5432 + db: postgres + sslmode: disable + # resultBackendConnection defaults to the same database as metadataConnection + resultBackendConnection: ~ + # or, you can use a different database + # resultBackendConnection: + # user: postgres + # pass: postgres + # protocol: postgresql + # host: ~ + # port: 5432 + # db: postgres + # sslmode: disable + # Note: brokerUrl can only be set during install, not upgrade + brokerUrl: ~ + +# Fernet key settings +# Note: fernetKey can only be set during install, not upgrade +fernetKey: ~ +fernetKeySecretName: ~ + +# Flask secret key for Airflow Webserver: `[webserver] secret_key` in airflow.cfg +webserverSecretKey: ~ +webserverSecretKeySecretName: ~ + +# In order to use kerberos you need to create secret containing the keytab file +# The secret name should follow naming convention of the application where resources are +# name {{ .Release-name }}-. In case of the keytab file, the postfix is "kerberos-keytab" +# So if your release is named "my-release" the name of the secret should be "my-release-kerberos-keytab" +# +# The Keytab content should be available in the "kerberos.keytab" key of the secret. +# +# apiVersion: v1 +# kind: Secret +# data: +# kerberos.keytab: +# type: Opaque +# +# +# If you have such keytab file you can do it with similar +# +# kubectl create secret generic {{ .Release.name }}-kerberos-keytab --from-file=kerberos.keytab +# +# +# Alternatively, instead of manually creating the secret, it is possible to specify +# kerberos.keytabBase64Content parameter. This parameter should contain base64 encoded keytab. +# + +kerberos: + enabled: false + ccacheMountPath: /var/kerberos-ccache + ccacheFileName: cache + configPath: /etc/krb5.conf + keytabBase64Content: ~ + keytabPath: /etc/airflow.keytab + principal: airflow@FOO.COM + reinitFrequency: 3600 + config: | + # This is an example config showing how you can use templating and how "example" config + # might look like. It works with the test kerberos server that we are using during integration + # testing at Apache Airflow (see `scripts/ci/docker-compose/integration-kerberos.yml` but in + # order to make it production-ready you must replace it with your own configuration that + # Matches your kerberos deployment. Administrators of your Kerberos instance should + # provide the right configuration. + + [logging] + default = "FILE:{{ template "airflow_logs_no_quote" . }}/kerberos_libs.log" + kdc = "FILE:{{ template "airflow_logs_no_quote" . }}/kerberos_kdc.log" + admin_server = "FILE:{{ template "airflow_logs_no_quote" . }}/kadmind.log" + + [libdefaults] + default_realm = FOO.COM + ticket_lifetime = 10h + renew_lifetime = 7d + forwardable = true + + [realms] + FOO.COM = { + kdc = kdc-server.foo.com + admin_server = admin_server.foo.com + } + +# Airflow Worker Config +workers: + # Number of airflow celery workers in StatefulSet + replicas: 1 + # Max number of old replicasets to retain + revisionHistoryLimit: ~ + + # Command to use when running Airflow workers (templated). + command: ~ + # Args to use when running Airflow workers (templated). + args: + - "bash" + - "-c" + # The format below is necessary to get `helm lint` happy + - |- + exec \ + airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "celery worker" "worker" }} + + # If the worker stops responding for 5 minutes (5*60s) kill the + # worker and let Kubernetes restart it + livenessProbe: + enabled: true + initialDelaySeconds: 10 + timeoutSeconds: 20 + failureThreshold: 5 + periodSeconds: 60 + command: ~ + + # Update Strategy when worker is deployed as a StatefulSet + updateStrategy: ~ + # Update Strategy when worker is deployed as a Deployment + strategy: + rollingUpdate: + maxSurge: "100%" + maxUnavailable: "50%" + + # When not set, the values defined in the global securityContext will be used + securityContext: {} + # runAsUser: 50000 + # fsGroup: 0 + # runAsGroup: 0 + + # Create ServiceAccount + serviceAccount: + # Specifies whether a ServiceAccount should be created + create: true + # The name of the ServiceAccount to use. + # If not set and create is true, a name is generated using the release name + name: ~ + + # Annotations to add to worker kubernetes service account. + annotations: {} + + # Allow KEDA autoscaling. + # Persistence.enabled must be set to false to use KEDA. + keda: + enabled: false + namespaceLabels: {} + + # How often KEDA polls the airflow DB to report new scale requests to the HPA + pollingInterval: 5 + + # How many seconds KEDA will wait before scaling to zero. + # Note that HPA has a separate cooldown period for scale-downs + cooldownPeriod: 30 + + # Minimum number of workers created by keda + minReplicaCount: 0 + + # Maximum number of workers created by keda + maxReplicaCount: 10 + + # Specify HPA related options + advanced: {} + # horizontalPodAutoscalerConfig: + # behavior: + # scaleDown: + # stabilizationWindowSeconds: 300 + # policies: + # - type: Percent + # value: 100 + # periodSeconds: 15 + + persistence: + # Enable persistent volumes + enabled: true + # Volume size for worker StatefulSet + size: 5Gi + # If using a custom storageClass, pass name ref to all statefulSets here + storageClassName: + # Execute init container to chown log directory. + # This is currently only needed in kind, due to usage + # of local-path provisioner. + fixPermissions: false + # Annotations to add to worker volumes + annotations: {} + + kerberosSidecar: + # Enable kerberos sidecar + enabled: false + resources: {} + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + + resources: {} + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + + # Grace period for tasks to finish after SIGTERM is sent from kubernetes + terminationGracePeriodSeconds: 600 + + # This setting tells kubernetes that its ok to evict + # when it wants to scale a node down. + safeToEvict: true + + # Launch additional containers into worker. + # Note: If used with KubernetesExecutor, you are responsible for signaling sidecars to exit when the main + # container finishes so Airflow can continue the worker shutdown process! + extraContainers: [] + # Add additional init containers into workers. + extraInitContainers: [] + + # Mount additional volumes into worker. + extraVolumes: [] + extraVolumeMounts: [] + + # Select certain nodes for airflow worker pods. + nodeSelector: {} + priorityClassName: ~ + affinity: {} + # default worker affinity is: + # podAntiAffinity: + # preferredDuringSchedulingIgnoredDuringExecution: + # - podAffinityTerm: + # labelSelector: + # matchLabels: + # component: worker + # topologyKey: kubernetes.io/hostname + # weight: 100 + tolerations: [] + topologySpreadConstraints: [] + # hostAliases to use in worker pods. + # See: + # https://kubernetes.io/docs/concepts/services-networking/add-entries-to-pod-etc-hosts-with-host-aliases/ + hostAliases: [] + # - ip: "127.0.0.2" + # hostnames: + # - "test.hostname.one" + # - ip: "127.0.0.3" + # hostnames: + # - "test.hostname.two" + + podAnnotations: {} + + # Labels specific to workers objects and pods + labels: {} + + logGroomerSidecar: + # Command to use when running the Airflow worker log groomer sidecar (templated). + command: ~ + # Args to use when running the Airflow worker log groomer sidecar (templated). + args: ["bash", "/clean-logs"] + # Number of days to retain logs + retentionDays: 15 + resources: {} + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + + waitForMigrations: + env: [] + + env: [] + +# Airflow scheduler settings +scheduler: + # If the scheduler stops heartbeating for 5 minutes (5*60s) kill the + # scheduler and let Kubernetes restart it + livenessProbe: + initialDelaySeconds: 10 + timeoutSeconds: 20 + failureThreshold: 5 + periodSeconds: 60 + command: ~ + # Airflow 2.0 allows users to run multiple schedulers, + # However this feature is only recommended for MySQL 8+ and Postgres + replicas: 1 + # Max number of old replicasets to retain + revisionHistoryLimit: ~ + + # Command to use when running the Airflow scheduler (templated). + command: ~ + # Args to use when running the Airflow scheduler (templated). + args: ["bash", "-c", "exec airflow scheduler"] + + # Update Strategy when scheduler is deployed as a StatefulSet + # (when using LocalExecutor and workers.persistence) + updateStrategy: ~ + # Update Strategy when scheduler is deployed as a Deployment + # (when not using LocalExecutor and workers.persistence) + strategy: ~ + + # When not set, the values defined in the global securityContext will be used + securityContext: {} + # runAsUser: 50000 + # fsGroup: 0 + # runAsGroup: 0 + + # Create ServiceAccount + serviceAccount: + # Specifies whether a ServiceAccount should be created + create: true + # The name of the ServiceAccount to use. + # If not set and create is true, a name is generated using the release name + name: ~ + + # Annotations to add to scheduler kubernetes service account. + annotations: {} + + # Scheduler pod disruption budget + podDisruptionBudget: + enabled: false + + # PDB configuration + config: + maxUnavailable: 1 + + resources: {} + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + + # This setting tells kubernetes that its ok to evict + # when it wants to scale a node down. + safeToEvict: true + + # Launch additional containers into scheduler. + extraContainers: [] + # Add additional init containers into scheduler. + extraInitContainers: [] + + # Mount additional volumes into scheduler. + extraVolumes: [] + extraVolumeMounts: [] + + # Select certain nodes for airflow scheduler pods. + nodeSelector: {} + affinity: {} + # default scheduler affinity is: + # podAntiAffinity: + # preferredDuringSchedulingIgnoredDuringExecution: + # - podAffinityTerm: + # labelSelector: + # matchLabels: + # component: scheduler + # topologyKey: kubernetes.io/hostname + # weight: 100 + tolerations: [] + topologySpreadConstraints: [] + + priorityClassName: ~ + + podAnnotations: {} + + # Labels specific to scheduler objects and pods + labels: {} + + logGroomerSidecar: + # Whether to deploy the Airflow scheduler log groomer sidecar. + enabled: true + # Command to use when running the Airflow scheduler log groomer sidecar (templated). + command: ~ + # Args to use when running the Airflow scheduler log groomer sidecar (templated). + args: ["bash", "/clean-logs"] + # Number of days to retain logs + retentionDays: 15 + resources: {} + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + + waitForMigrations: + # Whether to create init container to wait for db migrations + enabled: true + env: [] + + env: [] + +# Airflow create user job settings +createUserJob: + # Command to use when running the create user job (templated). + command: ~ + # Args to use when running the create user job (templated). + args: + - "bash" + - "-c" + # The format below is necessary to get `helm lint` happy + - |- + exec \ + airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "users create" "create_user" }} "$@" + - -- + - "-r" + - "{{ .Values.webserver.defaultUser.role }}" + - "-u" + - "{{ .Values.webserver.defaultUser.username }}" + - "-e" + - "{{ .Values.webserver.defaultUser.email }}" + - "-f" + - "{{ .Values.webserver.defaultUser.firstName }}" + - "-l" + - "{{ .Values.webserver.defaultUser.lastName }}" + - "-p" + - "{{ .Values.webserver.defaultUser.password }}" + + # Annotations on the create user job pod + annotations: {} + # jobAnnotations are annotations on the create user job + jobAnnotations: {} + + # Labels specific to createUserJob objects and pods + labels: {} + + # When not set, the values defined in the global securityContext will be used + securityContext: {} + # runAsUser: 50000 + # fsGroup: 0 + # runAsGroup: 0 + + # Create ServiceAccount + serviceAccount: + # Specifies whether a ServiceAccount should be created + create: true + # The name of the ServiceAccount to use. + # If not set and create is true, a name is generated using the release name + name: ~ + + # Annotations to add to create user kubernetes service account. + annotations: {} + + # Launch additional containers into user creation job + extraContainers: [] + + # Mount additional volumes into user creation job + extraVolumes: [] + extraVolumeMounts: [] + + nodeSelector: {} + affinity: {} + tolerations: [] + topologySpreadConstraints: [] + # In case you need to disable the helm hooks that create the jobs after install. + # Disable this if you are using ArgoCD for example + useHelmHooks: true + env: [] + + resources: {} + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + +# Airflow database migration job settings +migrateDatabaseJob: + enabled: true + # Command to use when running the migrate database job (templated). + command: ~ + # Args to use when running the migrate database job (templated). + args: + - "bash" + - "-c" + # The format below is necessary to get `helm lint` happy + - |- + exec \ + airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "db upgrade" "upgradedb" }} + + # Annotations on the database migration pod + annotations: {} + # jobAnnotations are annotations on the database migration job + jobAnnotations: {} + + # When not set, the values defined in the global securityContext will be used + securityContext: {} + # runAsUser: 50000 + # fsGroup: 0 + # runAsGroup: 0 + + # Create ServiceAccount + serviceAccount: + # Specifies whether a ServiceAccount should be created + create: true + # The name of the ServiceAccount to use. + # If not set and create is true, a name is generated using the release name + name: ~ + + # Annotations to add to migrate database job kubernetes service account. + annotations: {} + + resources: {} + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + + # Launch additional containers into database migration job + extraContainers: [] + + # Mount additional volumes into database migration job + extraVolumes: [] + extraVolumeMounts: [] + + nodeSelector: {} + affinity: {} + tolerations: [] + topologySpreadConstraints: [] + # In case you need to disable the helm hooks that create the jobs after install. + # Disable this if you are using ArgoCD for example + useHelmHooks: true + +# Airflow webserver settings +webserver: + allowPodLogReading: true + livenessProbe: + initialDelaySeconds: 15 + timeoutSeconds: 30 + failureThreshold: 20 + periodSeconds: 5 + scheme: HTTP + + readinessProbe: + initialDelaySeconds: 15 + timeoutSeconds: 30 + failureThreshold: 20 + periodSeconds: 5 + scheme: HTTP + + # Number of webservers + replicas: 1 + # Max number of old replicasets to retain + revisionHistoryLimit: ~ + + # Command to use when running the Airflow webserver (templated). + command: ~ + # Args to use when running the Airflow webserver (templated). + args: ["bash", "-c", "exec airflow webserver"] + + # Create ServiceAccount + serviceAccount: + # Specifies whether a ServiceAccount should be created + create: true + # The name of the ServiceAccount to use. + # If not set and create is true, a name is generated using the release name + name: ~ + + # Annotations to add to webserver kubernetes service account. + annotations: {} + + # Webserver pod disruption budget + podDisruptionBudget: + enabled: false + + # PDB configuration + config: + maxUnavailable: 1 + + # Allow overriding Update Strategy for Webserver + strategy: ~ + + # When not set, the values defined in the global securityContext will be used + securityContext: {} + # runAsUser: 50000 + # fsGroup: 0 + # runAsGroup: 0 + + # Additional network policies as needed (Deprecated - renamed to `webserver.networkPolicy.ingress.from`) + extraNetworkPolicies: [] + networkPolicy: + ingress: + # Peers for webserver NetworkPolicy ingress + from: [] + # Ports for webserver NetworkPolicy ingress (if `from` is set) + ports: + - port: "{{ .Values.ports.airflowUI }}" + + resources: {} + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + + # Create initial user. + defaultUser: + enabled: true + role: Admin + username: admin + email: admin@example.com + firstName: admin + lastName: user + password: admin + + # Launch additional containers into webserver. + extraContainers: [] + # Add additional init containers into webserver. + extraInitContainers: [] + + # Mount additional volumes into webserver. + extraVolumes: [] + extraVolumeMounts: [] + + # This string (can be templated) will be mounted into the Airflow Webserver as a custom + # webserver_config.py. You can bake a webserver_config.py in to your image instead. + webserverConfig: ~ + # webserverConfig: | + # from airflow import configuration as conf + + # # The SQLAlchemy connection string. + # SQLALCHEMY_DATABASE_URI = conf.get('database', 'SQL_ALCHEMY_CONN') + + # # Flask-WTF flag for CSRF + # CSRF_ENABLED = True + + service: + type: ClusterIP + ## service annotations + annotations: {} + ports: + - name: airflow-ui + port: "{{ .Values.ports.airflowUI }}" + # To change the port used to access the webserver: + # ports: + # - name: airflow-ui + # port: 80 + # targetPort: airflow-ui + # To only expose a sidecar, not the webserver directly: + # ports: + # - name: only_sidecar + # port: 80 + # targetPort: 8888 + loadBalancerIP: ~ + ## Limit load balancer source ips to list of CIDRs + # loadBalancerSourceRanges: + # - "10.123.0.0/16" + loadBalancerSourceRanges: [] + + # Select certain nodes for airflow webserver pods. + nodeSelector: {} + priorityClassName: ~ + affinity: {} + # default webserver affinity is: + # podAntiAffinity: + # preferredDuringSchedulingIgnoredDuringExecution: + # - podAffinityTerm: + # labelSelector: + # matchLabels: + # component: webserver + # topologyKey: kubernetes.io/hostname + # weight: 100 + tolerations: [] + topologySpreadConstraints: [] + + podAnnotations: {} + + # Labels specific webserver app + labels: {} + + waitForMigrations: + # Whether to create init container to wait for db migrations + enabled: true + env: [] + + env: [] + +# Airflow Triggerer Config +triggerer: + enabled: true + # Number of airflow triggerers in the deployment + replicas: 1 + # Max number of old replicasets to retain + revisionHistoryLimit: ~ + + # Command to use when running Airflow triggerers (templated). + command: ~ + # Args to use when running Airflow triggerer (templated). + args: ["bash", "-c", "exec airflow triggerer"] + + # Update Strategy for triggerers + strategy: + rollingUpdate: + maxSurge: "100%" + maxUnavailable: "50%" + + # If the triggerer stops heartbeating for 5 minutes (5*60s) kill the + # triggerer and let Kubernetes restart it + livenessProbe: + initialDelaySeconds: 10 + timeoutSeconds: 20 + failureThreshold: 5 + periodSeconds: 60 + command: ~ + + # Create ServiceAccount + serviceAccount: + # Specifies whether a ServiceAccount should be created + create: true + # The name of the ServiceAccount to use. + # If not set and create is true, a name is generated using the release name + name: ~ + + # Annotations to add to triggerer kubernetes service account. + annotations: {} + + # When not set, the values defined in the global securityContext will be used + securityContext: {} + # runAsUser: 50000 + # fsGroup: 0 + # runAsGroup: 0 + + resources: {} + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + + # Grace period for triggerer to finish after SIGTERM is sent from kubernetes + terminationGracePeriodSeconds: 60 + + # This setting tells kubernetes that its ok to evict + # when it wants to scale a node down. + safeToEvict: true + + # Launch additional containers into triggerer. + extraContainers: [] + # Add additional init containers into triggerers. + extraInitContainers: [] + + # Mount additional volumes into triggerer. + extraVolumes: [] + extraVolumeMounts: [] + + # Select certain nodes for airflow triggerer pods. + nodeSelector: {} + affinity: {} + # default triggerer affinity is: + # podAntiAffinity: + # preferredDuringSchedulingIgnoredDuringExecution: + # - podAffinityTerm: + # labelSelector: + # matchLabels: + # component: triggerer + # topologyKey: kubernetes.io/hostname + # weight: 100 + tolerations: [] + topologySpreadConstraints: [] + + priorityClassName: ~ + + podAnnotations: {} + + # Labels specific to triggerer objects and pods + labels: {} + + waitForMigrations: + # Whether to create init container to wait for db migrations + enabled: true + env: [] + + env: [] + +# Airflow Dag Processor Config +dagProcessor: + enabled: false + # Number of airflow dag processors in the deployment + replicas: 1 + # Max number of old replicasets to retain + revisionHistoryLimit: ~ + + # Command to use when running Airflow dag processors (templated). + command: ~ + # Args to use when running Airflow dag processor (templated). + args: ["bash", "-c", "exec airflow dag-processor"] + + # Update Strategy for dag processors + strategy: + rollingUpdate: + maxSurge: "100%" + maxUnavailable: "50%" + + # If the dag processor stops heartbeating for 5 minutes (5*60s) kill the + # dag processor and let Kubernetes restart it + livenessProbe: + initialDelaySeconds: 10 + timeoutSeconds: 20 + failureThreshold: 5 + periodSeconds: 60 + command: ~ + + # Create ServiceAccount + serviceAccount: + # Specifies whether a ServiceAccount should be created + create: true + # The name of the ServiceAccount to use. + # If not set and create is true, a name is generated using the release name + name: ~ + + # Annotations to add to dag processor kubernetes service account. + annotations: {} + + # When not set, the values defined in the global securityContext will be used + securityContext: {} + # runAsUser: 50000 + # fsGroup: 0 + # runAsGroup: 0 + + resources: {} + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + + # Grace period for dag processor to finish after SIGTERM is sent from kubernetes + terminationGracePeriodSeconds: 60 + + # This setting tells kubernetes that its ok to evict + # when it wants to scale a node down. + safeToEvict: true + + # Launch additional containers into dag processor. + extraContainers: [] + # Add additional init containers into dag processors. + extraInitContainers: [] + + # Mount additional volumes into dag processor. + extraVolumes: [] + extraVolumeMounts: [] + + # Select certain nodes for airflow dag processor pods. + nodeSelector: {} + affinity: {} + # default dag processor affinity is: + # podAntiAffinity: + # preferredDuringSchedulingIgnoredDuringExecution: + # - podAffinityTerm: + # labelSelector: + # matchLabels: + # component: dag-processor + # topologyKey: kubernetes.io/hostname + # weight: 100 + tolerations: [] + topologySpreadConstraints: [] + + priorityClassName: ~ + + podAnnotations: {} + + waitForMigrations: + # Whether to create init container to wait for db migrations + enabled: true + env: [] + + env: [] + +# Flower settings +flower: + # Enable flower. + # If True, and using CeleryExecutor/CeleryKubernetesExecutor, will deploy flower app. + enabled: false + # Max number of old replicasets to retain + revisionHistoryLimit: ~ + + # Command to use when running flower (templated). + command: ~ + # Args to use when running flower (templated). + args: + - "bash" + - "-c" + # The format below is necessary to get `helm lint` happy + - |- + exec \ + airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "celery flower" "flower" }} + + # Additional network policies as needed (Deprecated - renamed to `flower.networkPolicy.ingress.from`) + extraNetworkPolicies: [] + networkPolicy: + ingress: + # Peers for flower NetworkPolicy ingress + from: [] + # Ports for flower NetworkPolicy ingress (if ingressPeers is set) + ports: + - port: "{{ .Values.ports.flowerUI }}" + + resources: {} + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + + # When not set, the values defined in the global securityContext will be used + securityContext: {} + # runAsUser: 50000 + # fsGroup: 0 + # runAsGroup: 0 + + # Create ServiceAccount + serviceAccount: + # Specifies whether a ServiceAccount should be created + create: true + # The name of the ServiceAccount to use. + # If not set and create is true, a name is generated using the release name + name: ~ + + # Annotations to add to worker kubernetes service account. + annotations: {} + + # A secret containing the connection + secretName: ~ + + # Else, if username and password are set, create secret from username and password + username: ~ + password: ~ + + service: + type: ClusterIP + ## service annotations + annotations: {} + ports: + - name: flower-ui + port: "{{ .Values.ports.flowerUI }}" + # To change the port used to access flower: + # ports: + # - name: flower-ui + # port: 8080 + # targetPort: flower-ui + loadBalancerIP: ~ + ## Limit load balancer source ips to list of CIDRs + # loadBalancerSourceRanges: + # - "10.123.0.0/16" + loadBalancerSourceRanges: [] + + # Launch additional containers into the flower pods. + extraContainers: [] + # Mount additional volumes into the flower pods. + extraVolumes: [] + extraVolumeMounts: [] + + # Select certain nodes for airflow flower pods. + nodeSelector: {} + affinity: {} + tolerations: [] + topologySpreadConstraints: [] + + priorityClassName: ~ + + podAnnotations: {} + + # Labels specific to flower objects and pods + labels: {} + env: [] + +# StatsD settings +statsd: + enabled: true + # Max number of old replicasets to retain + revisionHistoryLimit: ~ + + # Create ServiceAccount + serviceAccount: + # Specifies whether a ServiceAccount should be created + create: true + # The name of the ServiceAccount to use. + # If not set and create is true, a name is generated using the release name + name: ~ + + # Annotations to add to worker kubernetes service account. + annotations: {} + + uid: 65534 + # When not set, `statsd.uid` will be used + securityContext: {} + # runAsUser: 65534 + # fsGroup: 0 + # runAsGroup: 0 + + # Additional network policies as needed + extraNetworkPolicies: [] + resources: {} + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + + service: + extraAnnotations: {} + + # Select certain nodes for StatsD pods. + nodeSelector: {} + affinity: {} + tolerations: [] + topologySpreadConstraints: [] + + priorityClassName: ~ + + # Additional mappings for StatsD exporter. + # If set, will merge default mapping and extra mappings, default mapping has higher priority. + # So, if you want to change some default mapping, please use `overrideMappings` + extraMappings: [] + + # Override mappings for StatsD exporter. + # If set, will ignore setting item in default and `extraMappings`. + # So, If you use it, ensure all mapping item contains in it. + overrideMappings: [] + + podAnnotations: {} + +# PgBouncer settings +pgbouncer: + # Enable PgBouncer + enabled: false + # Max number of old replicasets to retain + revisionHistoryLimit: ~ + # Command to use for PgBouncer(templated). + command: ["pgbouncer", "-u", "nobody", "/etc/pgbouncer/pgbouncer.ini"] + # Args to use for PgBouncer(templated). + args: ~ + auth_type: md5 + auth_file: /etc/pgbouncer/users.txt + + # Create ServiceAccount + serviceAccount: + # Specifies whether a ServiceAccount should be created + create: true + # The name of the ServiceAccount to use. + # If not set and create is true, a name is generated using the release name + name: ~ + + # Annotations to add to worker kubernetes service account. + annotations: {} + + # Additional network policies as needed + extraNetworkPolicies: [] + + # Pool sizes + metadataPoolSize: 10 + resultBackendPoolSize: 5 + + # Maximum clients that can connect to PgBouncer (higher = more file descriptors) + maxClientConn: 100 + + # supply the name of existing secret with pgbouncer.ini and users.txt defined + # you can load them to a k8s secret like the one below + # apiVersion: v1 + # kind: Secret + # metadata: + # name: pgbouncer-config-secret + # data: + # pgbouncer.ini: + # users.txt: + # type: Opaque + # + # configSecretName: pgbouncer-config-secret + # + configSecretName: ~ + + # PgBouncer pod disruption budget + podDisruptionBudget: + enabled: false + + # PDB configuration + config: + maxUnavailable: 1 + + # Limit the resources to PgBouncer. + # When you specify the resource request the k8s scheduler uses this information to decide which node to + # place the Pod on. When you specify a resource limit for a Container, the kubelet enforces those limits so + # that the running container is not allowed to use more of that resource than the limit you set. + # See: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + # Example: + # + # resource: + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + resources: {} + + service: + extraAnnotations: {} + + # https://www.pgbouncer.org/config.html + verbose: 0 + logDisconnections: 0 + logConnections: 0 + + sslmode: "prefer" + ciphers: "normal" + + ssl: + ca: ~ + cert: ~ + key: ~ + + # Add extra PgBouncer ini configuration in the databases section: + # https://www.pgbouncer.org/config.html#section-databases + extraIniMetadata: ~ + extraIniResultBackend: ~ + # Add extra general PgBouncer ini configuration: https://www.pgbouncer.org/config.html + extraIni: ~ + + # Mount additional volumes into pgbouncer. + extraVolumes: [] + extraVolumeMounts: [] + + # Select certain nodes for PgBouncer pods. + nodeSelector: {} + affinity: {} + tolerations: [] + topologySpreadConstraints: [] + + priorityClassName: ~ + + uid: 65534 + + metricsExporterSidecar: + resources: {} + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + sslmode: "disable" + +# Configuration for the redis provisioned by the chart +redis: + enabled: true + terminationGracePeriodSeconds: 600 + + # Create ServiceAccount + serviceAccount: + # Specifies whether a ServiceAccount should be created + create: true + # The name of the ServiceAccount to use. + # If not set and create is true, a name is generated using the release name + name: ~ + + # Annotations to add to worker kubernetes service account. + annotations: {} + + persistence: + # Enable persistent volumes + enabled: true + # Volume size for worker StatefulSet + size: 1Gi + # If using a custom storageClass, pass name ref to all statefulSets here + storageClassName: + # Annotations to add to redis volumes + annotations: {} + + resources: {} + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + + # If set use as redis secret. Make sure to also set data.brokerUrlSecretName value. + passwordSecretName: ~ + + # Else, if password is set, create secret with it, + # Otherwise a new password will be generated on install + # Note: password can only be set during install, not upgrade. + password: ~ + + # This setting tells kubernetes that its ok to evict + # when it wants to scale a node down. + safeToEvict: true + + # Select certain nodes for redis pods. + nodeSelector: {} + affinity: {} + tolerations: [] + topologySpreadConstraints: [] + + # Set to 0 for backwards-compatiblity + uid: 0 + # If not set, `redis.uid` will be used + securityContext: {} + # runAsUser: 999 + # runAsGroup: 0 + + podAnnotations: {} +# Auth secret for a private registry +# This is used if pulling airflow images from a private registry +registry: + secretName: ~ + + # Example: + # connection: + # user: ~ + # pass: ~ + # host: ~ + # email: ~ + connection: {} + +# Elasticsearch logging configuration +elasticsearch: + # Enable elasticsearch task logging + enabled: false + # A secret containing the connection + secretName: ~ + # Or an object representing the connection + # Example: + # connection: + # user: ~ + # pass: ~ + # host: ~ + # port: ~ + connection: {} + +# All ports used by chart +ports: + flowerUI: 5555 + airflowUI: 8080 + workerLogs: 8793 + redisDB: 6379 + statsdIngest: 9125 + statsdScrape: 9102 + pgbouncer: 6543 + pgbouncerScrape: 9127 + +# Define any ResourceQuotas for namespace +quotas: {} + +# Define default/max/min values for pods and containers in namespace +limits: [] + +# This runs as a CronJob to cleanup old pods. +cleanup: + enabled: false + # Run every 15 minutes + schedule: "*/15 * * * *" + # Command to use when running the cleanup cronjob (templated). + command: ~ + # Args to use when running the cleanup cronjob (templated). + args: ["bash", "-c", "exec airflow kubernetes cleanup-pods --namespace={{ .Release.Namespace }}"] + + + # Select certain nodes for airflow cleanup pods. + nodeSelector: {} + affinity: {} + tolerations: [] + topologySpreadConstraints: [] + + podAnnotations: {} + + # Labels specific to cleanup objects and pods + labels: {} + + resources: {} + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + + # Create ServiceAccount + serviceAccount: + # Specifies whether a ServiceAccount should be created + create: true + # The name of the ServiceAccount to use. + # If not set and create is true, a name is generated using the release name + name: ~ + + # Annotations to add to cleanup cronjob kubernetes service account. + annotations: {} + + # When not set, the values defined in the global securityContext will be used + securityContext: {} + # runAsUser: 50000 + # runAsGroup: 0 + env: [] + + # Specify history limit + # When set, overwrite the default k8s number of successful and failed CronJob executions that are saved. + failedJobsHistoryLimit: ~ + successfulJobsHistoryLimit: ~ + +# Configuration for postgresql subchart +# Not recommended for production +postgresql: + enabled: true + postgresqlPassword: postgres + postgresqlUsername: postgres + +# Config settings to go into the mounted airflow.cfg +# +# Please note that these values are passed through the `tpl` function, so are +# all subject to being rendered as go templates. If you need to include a +# literal `{{` in a value, it must be expressed like this: +# +# a: '{{ "{{ not a template }}" }}' +# +# Do not set config containing secrets via plain text values, use Env Var or k8s secret object +# yamllint disable rule:line-length +config: + core: + dags_folder: '{{ include "airflow_dags" . }}' + # This is ignored when used with the official Docker image + load_examples: 'False' + executor: '{{ .Values.executor }}' + # For Airflow 1.10, backward compatibility; moved to [logging] in 2.0 + colored_console_log: 'False' + remote_logging: '{{- ternary "True" "False" .Values.elasticsearch.enabled }}' + max_active_tasks_per_dag: 1024 + parallelism: 1024 + api: + auth_backend: airflow.api.auth.backend.basic_auth + logging: + remote_logging: '{{- ternary "True" "False" .Values.elasticsearch.enabled }}' + colored_console_log: 'False' + metrics: + statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}' + statsd_port: 9125 + statsd_prefix: airflow + statsd_host: '{{ printf "%s-statsd" .Release.Name }}' + webserver: + enable_proxy_fix: 'True' + # For Airflow 1.10 + rbac: 'True' + celery: + flower_url_prefix: '{{ .Values.ingress.flower.path }}' + worker_concurrency: 16 + scheduler: + standalone_dag_processor: '{{ ternary "True" "False" .Values.dagProcessor.enabled }}' + # statsd params included for Airflow 1.10 backward compatibility; moved to [metrics] in 2.0 + statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}' + statsd_port: 9125 + statsd_prefix: airflow + statsd_host: '{{ printf "%s-statsd" .Release.Name }}' + # `run_duration` included for Airflow 1.10 backward compatibility; removed in 2.0. + run_duration: 41460 + elasticsearch: + json_format: 'True' + log_id_template: "{dag_id}_{task_id}_{execution_date}_{try_number}" + elasticsearch_configs: + max_retries: 3 + timeout: 30 + retry_timeout: 'True' + kerberos: + keytab: '{{ .Values.kerberos.keytabPath }}' + reinit_frequency: '{{ .Values.kerberos.reinitFrequency }}' + principal: '{{ .Values.kerberos.principal }}' + ccache: '{{ .Values.kerberos.ccacheMountPath }}/{{ .Values.kerberos.ccacheFileName }}' + celery_kubernetes_executor: + kubernetes_queue: 'kubernetes' + kubernetes: + namespace: '{{ .Release.Namespace }}' + airflow_configmap: '{{ include "airflow_config" . }}' + airflow_local_settings_configmap: '{{ include "airflow_config" . }}' + pod_template_file: '{{ include "airflow_pod_template_file" . }}/pod_template_file.yaml' + worker_container_repository: '{{ .Values.images.airflow.repository | default .Values.defaultAirflowRepository }}' + worker_container_tag: '{{ .Values.images.airflow.tag | default .Values.defaultAirflowTag }}' + multi_namespace_mode: '{{ ternary "True" "False" .Values.multiNamespaceMode }}' +# yamllint enable rule:line-length + +# Whether Airflow can launch workers and/or pods in multiple namespaces +# If true, it creates ClusterRole/ClusterRolebinding (with access to entire cluster) +multiNamespaceMode: false + +# `podTemplate` is a templated string containing the contents of `pod_template_file.yaml` used for +# KubernetesExecutor workers. The default `podTemplate` will use normal `workers` configuration parameters +# (e.g. `workers.resources`). As such, you normally won't need to override this directly, however, +# you can still provide a completely custom `pod_template_file.yaml` if desired. +# If not set, a default one is created using `files/pod-template-file.kubernetes-helm-yaml`. +podTemplate: ~ +# The following example is NOT functional, but meant to be illustrative of how you can provide a custom +# `pod_template_file`. You're better off starting with the default in +# `files/pod-template-file.kubernetes-helm-yaml` and modifying from there. +# We will set `priorityClassName` in this example: +# podTemplate: | +# apiVersion: v1 +# kind: Pod +# metadata: +# name: dummy-name +# labels: +# tier: airflow +# component: worker +# release: {{ .Release.Name }} +# spec: +# priorityClassName: high-priority +# containers: +# - name: base +# ... + +# Git sync +dags: + persistence: + # Enable persistent volume for storing dags + enabled: false + # Volume size for dags + size: 1Gi + # If using a custom storageClass, pass name here + storageClassName: + # access mode of the persistent volume + accessMode: ReadWriteOnce + ## the name of an existing PVC to use + existingClaim: + ## optional subpath for dag volume mount + subPath: + gitSync: + enabled: false + + # git repo clone url + # ssh examples ssh://git@github.com/apache/airflow.git + # git@github.com:apache/airflow.git + # https example: https://github.com/apache/airflow.git + # the number of consecutive failures allowed before aborting + maxFailures: 0 + # subpath within the repo where dags are located + # should be "" if dags are at repo root + subPath: "" + # if your repo needs a user name password + # you can load them to a k8s secret like the one below + # --- + # apiVersion: v1 + # kind: Secret + # metadata: + # name: git-credentials + # data: + # GIT_SYNC_USERNAME: + # GIT_SYNC_PASSWORD: + # and specify the name of the secret below + # + # credentialsSecret: git-credentials + # + # + # If you are using an ssh clone url, you can load + # the ssh private key to a k8s secret like the one below + # --- + # apiVersion: v1 + # kind: Secret + # metadata: + # name: airflow-ssh-secret + # data: + # # key needs to be gitSshKey + # gitSshKey: + # and specify the name of the secret below + # sshKeySecret: airflow-ssh-secret + # + # If you are using an ssh private key, you can additionally + # specify the content of your known_hosts file, example: + # + # knownHosts: | + # gitlab.ethz.ch ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAICK9KTASXGLnjYdaSgVZgqkQFELM2WxSkXKOSceBUQpF + # gitlab.ethz.ch ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBIMSQu1TqCpCLoA4Qt7imrgYIntQkBU3ton7Yh4fKEUjPHm9/f2B0RRhJOJ75clcRPt587NnAZrcFM8SA60+dXg= + # interval between git sync attempts in seconds + wait: 60 + containerName: git-sync + uid: 65533 + + # When not set, the values defined in the global securityContext will be used + securityContext: {} + # runAsUser: 65533 + # runAsGroup: 0 + + extraVolumeMounts: [] + env: [] + resources: {} + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + +logs: + persistence: + # Enable persistent volume for storing logs + enabled: false + # Volume size for logs + size: 1Gi + # If using a custom storageClass, pass name here + storageClassName: + ## the name of an existing PVC to use + existingClaim: + diff --git a/scripts/analyze_per_task_latency.py b/scripts/analyze_per_task_latency.py new file mode 100644 index 000000000..d2825fec4 --- /dev/null +++ b/scripts/analyze_per_task_latency.py @@ -0,0 +1,71 @@ +import collections +import json +import re +import sys + +import pandas as pd + + +data = collections.defaultdict(lambda: []) +data_compact = collections.defaultdict(lambda: {}) + +pd.options.display.float_format = '{:f}'.format + +if len(sys.argv) < 2: + print("Usage: analyze_per_task_latency.py ") + exit(1) +with open(sys.argv[1]) as f: + scheduler_log_data = f.read() + +ts_types = set() +for line in scheduler_log_data.split('\n'): + m = re.match(r".*TIMING: (.*)", line) + if m: + line_data = json.loads(m.group(1)) + ts_types.update((f"{line_data['function']}_{x}" for x in line_data["timestamp_annotations"])) +for line in scheduler_log_data.split('\n'): + m = re.match(r".*TIMING: (.*)", line) + if m: + line_data = json.loads(m.group(1)) + log_key = (line_data["dag_id"], line_data["task_id"], line_data["run_id"], line_data["map_index"], line_data["try_number"]) + + for key in set(line_data.keys()) - {"times", "timestamp_annotations"}: + data[key].append(line_data[key]) + line_ts_annotations = tuple(f'{line_data["function"]}_{x}' for x in line_data["timestamp_annotations"]) + for ts, ts_type in zip(line_data["times"], line_ts_annotations): + data[ts_type].append(ts) + data_compact[log_key][ts_type] = ts + for ts_type in ts_types.difference(line_ts_annotations): + data[ts_type].append(None) + +reshaped_compact_data = collections.defaultdict(list) +for log_key, timestamps in data_compact.items(): + dag_id, task_id, run_id, map_index, try_number = log_key + reshaped_compact_data["dag_id"].append(dag_id) + reshaped_compact_data["task_id"].append(task_id) + reshaped_compact_data["run_id"].append(run_id) + reshaped_compact_data["map_index"].append(map_index) + reshaped_compact_data["try_number"].append(try_number) + for ts_type, ts in timestamps.items(): + reshaped_compact_data[ts_type].append(ts) + +# df = pd.DataFrame(data) +df_compact = pd.DataFrame(reshaped_compact_data) +df_w1 = df_compact[df_compact['dag_id'].str.contains("_w1_")] +df_send_post = df_w1["executor_async_task_before_post_request"] - df_w1["executor_run_pod_async_function_entry"] +df_receive_post = df_w1["flask_run_task_instance_function_entry"] - df_w1["executor_run_pod_async_function_entry"] +df_function_started = df_w1["flask_run_task_instance_after_start_subprocess"] - df_w1["executor_run_pod_async_function_entry"] +df_function_finished = df_w1["flask_run_task_instance_after_finished_subprocess"] - df_w1["executor_run_pod_async_function_entry"] +df_task_finished = df_w1["executor_async_task_function_exit"] - df_w1["executor_run_pod_async_function_entry"] + +stats = [] +pretty_names = ["POST request sent","POST request received","Subprocess started","Subprocess done","Total"] +dataframes = [df_send_post, df_receive_post, df_function_started, df_function_finished, df_task_finished] +print("All reported as min, mean, median, max") +for name, df in zip(pretty_names, dataframes): + stats.append([df.min(), df.median(), df.mean(), df.max()]) + print(f"{name}: {df.min():.2f}, {df.median():.2f}, {df.mean():.2f}, {df.max():.2f}") + +print("For latex:") +for name, stats_line in zip(pretty_names, stats): + print(name + " & " + " & ".join(map(lambda x: format(x, ".2f"), stats_line)) + r" \\") diff --git a/scripts/build_profiling_image.sh b/scripts/build_profiling_image.sh new file mode 100755 index 000000000..c5a3e74cd --- /dev/null +++ b/scripts/build_profiling_image.sh @@ -0,0 +1,12 @@ +#!/usr/bin/zsh + +function cleanup { + rm -rf workflows/image/airflow +} + +trap cleanup EXIT + +cleanup +cp -r airflow workflows/image/airflow +(cd ./workflows/image && docker build -f Dockerfile_profiling . -t airflow-worker:latest) || (echo Failed to build airflow-worker:latest && exit 1) +echo Built airflow-worker:latest with profiling support diff --git a/scripts/plot_e2e_latency.py b/scripts/plot_e2e_latency.py new file mode 100644 index 000000000..1ee2605be --- /dev/null +++ b/scripts/plot_e2e_latency.py @@ -0,0 +1,61 @@ +import collections +import pathlib +import re + +import matplotlib.pyplot as plt +import matplotlib.patches as mpatches + + +def get_end_to_end_latencies(benchmark_logs_path, filter_func): + data_by_depth = collections.defaultdict(lambda: []) + + for gateway_log_path in benchmark_logs_path.glob("log_gateway*"): + with open(gateway_log_path) as f: + lines = f.readlines() + for line in lines: + m = re.match(r"Running (?P\S+) took ((?P[0-9]+)m)?(?P[0-9.]+)s", line) + if m: + workflow = m.group("workflow") + if not filter_func(workflow): + continue + time_seconds = float(m.group("seconds")) + time_minutes = m.group("minutes") + if time_minutes is not None: + time_seconds += int(time_minutes) * 60 + depth = int(workflow.split("_d")[-1]) + data_by_depth[depth].append(time_seconds) + return data_by_depth + + +benchmark_knative_logs_path = pathlib.Path("./benchmark_data/benchmarking_logs_1676206749") +data_knative = get_end_to_end_latencies(benchmark_knative_logs_path, lambda x: "_w1_" in x) +benchmark_stock_logs_path = pathlib.Path("./benchmark_data/benchmarking_logs_stock_1676208700") +data_stock = get_end_to_end_latencies(benchmark_stock_logs_path, lambda x: "_w1_" in x) + +print(f"Found data for {len(data_knative)} different workflows.") + +# Ian Hincks, https://stackoverflow.com/questions/33864578/matplotlib-making-labels-for-violin-plots, 2023-02-13 +labels = [] +def add_label(violin, label): + color = violin["bodies"][0].get_facecolor().flatten() + labels.append((mpatches.Patch(color=color), label)) + +def do_violinplot_with_label(data, label): + x_values = [] + y_values = [] + for depth, values in data.items(): + x_values.append(depth) + y_values.append(values) + p = plt.violinplot(y_values, x_values, widths=0.75) + add_label(p, label) + +do_violinplot_with_label(data_knative, "Airflow + Knative") +do_violinplot_with_label(data_stock, "Airflow w/o Knative") + +plt.xlabel("Workflow Depth [number of functions]") +plt.ylabel("Latency [s]") +plt.title("End-to-End latency comparison between Airflow+Knative\nand stock Airflow for various workflow depths") +plt.legend(*zip(*labels), loc="upper left", bbox_to_anchor=(0, -0.15), ncol=2) +output_path = "latency_depth.pdf" +print(f"Saving plot to {output_path}") +plt.savefig(output_path, bbox_inches='tight') diff --git a/scripts/plot_throughput_width.py b/scripts/plot_throughput_width.py new file mode 100644 index 000000000..e8d5f953c --- /dev/null +++ b/scripts/plot_throughput_width.py @@ -0,0 +1,69 @@ +import collections +import pathlib +import re + +import matplotlib.pyplot as plt +import matplotlib.patches as mpatches + + +# extract throughput information from gateway server logs +def get_throughput(benchmark_logs_path, filter_func): + data_by_width = collections.defaultdict(lambda: []) + + for gateway_log_path in benchmark_logs_path.glob("log_gateway*"): + with open(gateway_log_path) as f: + lines = f.readlines() + for line in lines: + m = re.match(r"Running (?P\S+) took ((?P[0-9]+)m)?(?P[0-9.]+)s", line) + if m: + workflow = m.group("workflow") + if not filter_func(workflow): + continue + time_seconds = float(m.group("seconds")) + time_minutes = m.group("minutes") + if time_minutes is not None: + time_seconds += int(time_minutes) * 60 + width = int(re.match(r".*_w([0-9]+)_", workflow).group(1)) + throughput = (2 + width) / time_seconds + data_by_width[width].append(throughput) + return data_by_width + +benchmark_knative_logs_path = pathlib.Path("./benchmark_data/benchmarking_logs_1676206749") +data_knative = get_throughput(benchmark_knative_logs_path, lambda x: "_d3" in x) +benchmark_stock_logs_path = pathlib.Path("./benchmark_data/benchmarking_logs_stock_1676240305") +data_stock = get_throughput(benchmark_stock_logs_path, lambda x: "_d3" in x) + +print(f"Found data for {len(data_knative)} different workflows.") + +# Ian Hincks, https://stackoverflow.com/questions/33864578/matplotlib-making-labels-for-violin-plots, 2023-02-13 +labels = [] +def add_label(violin, label): + color = violin["bodies"][0].get_facecolor().flatten() + labels.append((mpatches.Patch(color=color), label)) + + +def do_violinplot_with_label(data, label): + x_values = [] + y_values = [] + count = 0 + sum_througput = 0 + for width, values in data.items(): + x_values.append(width) + y_values.append(values) + sum_througput += sum(values) + count += len(values) + print(f"Average throughput for {label}: {sum_througput/count:.2f} tasks / s") + p = plt.violinplot(y_values, x_values, widths=1.5) + add_label(p, label) + + +do_violinplot_with_label(data_knative, "Airflow + Knative") +do_violinplot_with_label(data_stock, "Airflow w/o Knative") + +plt.xlabel("Workflow Depth [number of functions]") +plt.ylabel("Latency [s]") +plt.title("End-to-End latency comparison between Airflow+Knative\nand stock Airflow for various workflow depths") +plt.legend(*zip(*labels), loc="upper left", bbox_to_anchor=(0, -0.15), ncol=2) +output_path = "throughput_width.pdf" +print(f"Saving plot to {output_path}") +plt.savefig(output_path, bbox_inches='tight') diff --git a/scripts/run_benchmarks.sh b/scripts/run_benchmarks.sh new file mode 100755 index 000000000..09117a8d9 --- /dev/null +++ b/scripts/run_benchmarks.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +set -x +kubectl delete namespace airflow +./scripts/setup_airflow.sh + +# collect data from modified Airflow with Knative +log_dir=./benchmarking_logs_"$(date +%s)" +mkdir -p "$log_dir" + +for benchmark in $(cd workflows/knative_yamls && ls | grep benchmark); do + echo Doing benchmark "$benchmark" + kn service delete -n airflow --all + sleep 5 + while [[ $(kubectl -n airflow get pods | grep airflow-worker.*Terminating) ]]; do sleep 1; done + while [[ ! $(kubectl -n airflow get pods | grep webserver.*1/1.*Running) ]]; do sleep 1; done + kn service apply -f workflow-gateway/workflow-gateway.yaml -n airflow + ./scripts/deploy_workflow.sh "$benchmark" + scheduler="$(kubectl -n airflow get pods | grep scheduler | awk '{print $1}')" + GATEWAY_URL='http://airflow-workflow-gateway.airflow.192.168.1.240.sslip.io' + for i in {1..5}; do + echo running iteration "$i" + curl -u admin:admin -X POST -H 'application/json' --data '{}' "$GATEWAY_URL"/runWorkflow/"$benchmark" + kubectl -n airflow logs "$scheduler" scheduler | grep TIMING > "$log_dir"/log_timing_"$benchmark"_"$i".txt + sleep 1 + done + kubectl -n airflow logs $scheduler scheduler > "$log_dir"/log_scheduler_"$benchmark".txt + gateway="$(kubectl -n airflow get pods | grep gateway | awk '{print $1}')" + kubectl -n airflow logs "$gateway" user-container > "$log_dir"/log_gateway_"$benchmark".txt + sleep 1 +done + +# collect data for stock Airflow deployment +log_dir=./benchmarking_logs_stock_"$(date +%s)" +mkdir -p "$log_dir" + +kubectl delete namespace airflow +./scripts/setup_stock_airflow.sh +kn service apply -f workflow-gateway/workflow-gateway.yaml -n airflow +for benchmark in $(cd workflows/knative_yamls && ls | grep benchmark); do + echo Doing benchmark "$benchmark" + sleep 5 + scheduler="$(kubectl -n airflow get pods | grep scheduler | awk '{print $1}')" + GATEWAY_URL='http://airflow-workflow-gateway.airflow.192.168.1.240.sslip.io' + for i in {1..5}; do + echo running iteration "$i" + curl -u admin:admin -X POST -H 'application/json' --data '{}' "$GATEWAY_URL"/runWorkflow/"$benchmark" + kubectl -n airflow logs "$scheduler" scheduler | grep TIMING > "$log_dir"/log_timing_"$benchmark"_"$i".txt + sleep 1 + done + kubectl -n airflow logs $scheduler scheduler > "$log_dir"/log_scheduler_"$benchmark".txt + gateway="$(kubectl -n airflow get pods | grep gateway | awk '{print $1}')" + kubectl -n airflow logs "$gateway" user-container > "$log_dir"/log_gateway_"$benchmark".txt + sleep 1 +done diff --git a/scripts/setup_stock_airflow.sh b/scripts/setup_stock_airflow.sh new file mode 100755 index 000000000..a706435d6 --- /dev/null +++ b/scripts/setup_stock_airflow.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +# increase max open files +ulimit -n 1000000 + +# install helm +curl https://baltocdn.com/helm/signing.asc | gpg --dearmor | sudo tee /usr/share/keyrings/helm.gpg > /dev/null +sudo apt-get install apt-transport-https --yes +echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/helm.gpg] https://baltocdn.com/helm/stable/debian/ all main" | sudo tee /etc/apt/sources.list.d/helm-stable-debian.list +sudo apt-get update +sudo apt-get install helm + +# setup volumes +kubectl create namespace airflow +sudo mkdir -p /mnt/data{0..19} +sudo chmod 777 /mnt/data* +kubectl -n airflow apply -f configs/volumes.yaml + +# create resource files from airflow helm chart +helm repo add apache-airflow https://airflow.apache.org +helm template airflow apache-airflow/airflow --version 1.7.0 --namespace airflow -f configs/values-stock.yaml --debug > configs/airflow-stock.yaml + +# deploy airflow +kubectl -n airflow apply -f configs/airflow-stock.yaml +while [[ ! $(kubectl -n airflow get pods | grep scheduler.*Running) ]]; do sleep 1; done + +# deploy workflow gateway +# this is the service that lets users run workflows and returns the results +kn service apply -f workflow-gateway/workflow-gateway.yaml -n airflow + +# wait for webserver +while [[ ! $(kubectl -n airflow get pods | grep webserver.*1/1.*Running) ]]; do sleep 1; done diff --git a/workflows/image/Dockerfile_profiling b/workflows/image/Dockerfile_profiling new file mode 100644 index 000000000..77a05ce23 --- /dev/null +++ b/workflows/image/Dockerfile_profiling @@ -0,0 +1,8 @@ +FROM apache/airflow:2.4.3 +STOPSIGNAL SIGINT +COPY airflow-dags /opt/airflow/dags +COPY airflow /home/airflow/.local/lib/python3.7/site-packages/airflow +COPY flask_worker_entrypoint.py /opt/airflow/flask_worker_entrypoint.py +COPY airflow-with-profiling.py /home/airflow/.local/bin/airflow +ENTRYPOINT ["flask", "--app", "/opt/airflow/flask_worker_entrypoint.py", "run", "--host", "0.0.0.0", "--port", "50000"] +EXPOSE 50000 diff --git a/workflows/image/airflow-with-profiling.py b/workflows/image/airflow-with-profiling.py new file mode 100755 index 000000000..ef416ed7c --- /dev/null +++ b/workflows/image/airflow-with-profiling.py @@ -0,0 +1,10 @@ +#!/usr/local/bin/python +# -*- coding: utf-8 -*- +import re +import sys +import cProfile +from airflow.__main__ import main +if __name__ == '__main__': + sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) + cProfile.run('main()', sort="cumulative") + sys.exit() diff --git a/workflows/image/entrypoint_profiling.py b/workflows/image/entrypoint_profiling.py new file mode 100644 index 000000000..7aee1bb72 --- /dev/null +++ b/workflows/image/entrypoint_profiling.py @@ -0,0 +1,80 @@ +import json +import logging +import os +import pathlib +import time + +from flask import Flask +from flask import request +import subprocess + +app = Flask(__name__) +logging.getLogger().setLevel(logging.INFO) + +class Timer: + def __init__(self, func_name, annotations): + self._times = [] + self._timestamp_annotations = [] + self._func_name = func_name + self._annotations = annotations + + def time(self, annotation: str): + self._times.append(time.time()) + self._timestamp_annotations.append(annotation) + + def get_log_line(self): + return f'TIMING: {json.dumps(self.get_timing_info())}' + + def get_timing_info(self): + timing_info = {"function": self._func_name, "times": self._times, "timestamp_annotations": self._timestamp_annotations} + timing_info.update(self._annotations) + return timing_info + + + def update_annotations(self, new_annotations): + self._annotations.update(new_annotations) + + +@app.route("/run_task_instance", methods=['POST']) +def run_task_instance(): + timer = Timer("flask_run_task_instance", {}) + timer.time("function_entry") + data = request.json + + # setup xcom input and output + annotations = data["annotations"] + timer.update_annotations(annotations) + base_path = pathlib.Path('/home/airflow') / annotations["dag_id"] / annotations["task_id"] / annotations["run_id"] / str(annotations["map_index"]) + os.makedirs(base_path, exist_ok=True) + input_path = base_path / "input" + with open(input_path, "w") as f: + json.dump(data["xcoms"], f) + airflow_output_path = f'{base_path}/output' + try: + os.remove(airflow_output_path) + except FileNotFoundError: + pass + + # execute task + timer.time("after_input_processing") + airflow_stats = open("airflow_stats.txt", "w") + p = subprocess.Popen(data["args"], stdout=airflow_stats, stderr=subprocess.PIPE) + airflow_stats.close() + timer.time("after_start_subprocess") + p.wait() + timer.time("after_finished_subprocess") + logging.info(f"exitcode: {p.returncode}; stderr: {p.stderr.read()}") + + # retrieve xcom outputs + try: + with open(airflow_output_path, 'r') as f: + xcoms = [json.loads(line) for line in f.readlines()] + except FileNotFoundError: + xcoms = [] + timer.time("function_exit") + timing_info = timer.get_timing_info() + res = json.dumps({"xcoms": xcoms, "timing_info": timing_info}) + return res + +if __name__ == "__main__": + app.run(host='0.0.0.0', port='50000')