diff --git a/chart/templates/_helpers.yaml b/chart/templates/_helpers.yaml index 25a70cb780222..887e9bd33cbb1 100644 --- a/chart/templates/_helpers.yaml +++ b/chart/templates/_helpers.yaml @@ -590,122 +590,80 @@ server_tls_key_file = /etc/pgbouncer/server.key {{- end }} {{- end }} -{{/* Create the name of the webserver service account to use */}} -{{- define "webserver.serviceAccountName" -}} - {{- if .Values.webserver.serviceAccount.create }} - {{- default (printf "%s-webserver" (include "airflow.serviceAccountName" .)) .Values.webserver.serviceAccount.name }} +{{/* Helper to generate service account name respecting .Values.$section.serviceAccount flags */}} +{{- define "_serviceAccountName" -}} + {{- $sa := get (get .Values .key) "serviceAccount" }} + {{- if $sa.create }} + {{- default (printf "%s-%s" (include "airflow.serviceAccountName" .) (default .key .nameSuffix )) $sa.name | quote }} {{- else }} - {{- default "default" .Values.webserver.serviceAccount.name }} + {{- default "default" $sa.name | quote }} {{- end }} {{- end }} +{{/* Create the name of the webserver service account to use */}} +{{- define "webserver.serviceAccountName" -}} + {{- include "_serviceAccountName" (merge (dict "key" "webserver") .) -}} +{{- end }} + -{{/* Create the name of the RPC server service account to use */}} -{{- define "rpcServer.serviceAccountName" -}} - {{- if .Values._rpcServer.serviceAccount.create }} - {{- default (printf "%s-rpc-server" (include "airflow.serviceAccountName" .)) .Values._rpcServer.serviceAccount.name }} - {{- else }} - {{- default "default" .Values._rpcServer.serviceAccount.name }} - {{- end }} +{{/* Create the name of the API server service account to use */}} +{{- define "apiServer.serviceAccountName" -}} + {{- include "_serviceAccountName" (merge (dict "key" "apiServer" "nameSuffix" "api-server" ) .) -}} {{- end }} {{/* Create the name of the redis service account to use */}} {{- define "redis.serviceAccountName" -}} - {{- if .Values.redis.serviceAccount.create }} - {{- default (printf "%s-redis" (include "airflow.serviceAccountName" .)) .Values.redis.serviceAccount.name }} - {{- else }} - {{- default "default" .Values.redis.serviceAccount.name }} - {{- end }} + {{- include "_serviceAccountName" (merge (dict "key" "redis") .) -}} {{- end }} {{/* Create the name of the flower service account to use */}} {{- define "flower.serviceAccountName" -}} - {{- if .Values.flower.serviceAccount.create }} - {{- default (printf "%s-flower" (include "airflow.serviceAccountName" .)) .Values.flower.serviceAccount.name }} - {{- else }} - {{- default "default" .Values.flower.serviceAccount.name }} - {{- end }} + {{- include "_serviceAccountName" (merge (dict "key" "flower") .) -}} {{- end }} {{/* Create the name of the scheduler service account to use */}} {{- define "scheduler.serviceAccountName" -}} - {{- if .Values.scheduler.serviceAccount.create }} - {{- default (printf "%s-scheduler" (include "airflow.serviceAccountName" .)) .Values.scheduler.serviceAccount.name }} - {{- else }} - {{- default "default" .Values.scheduler.serviceAccount.name }} - {{- end }} + {{- include "_serviceAccountName" (merge (dict "key" "scheduler") .) -}} {{- end }} {{/* Create the name of the StatsD service account to use */}} {{- define "statsd.serviceAccountName" -}} - {{- if .Values.statsd.serviceAccount.create }} - {{- default (printf "%s-statsd" (include "airflow.serviceAccountName" .)) .Values.statsd.serviceAccount.name }} - {{- else }} - {{- default "default" .Values.statsd.serviceAccount.name }} - {{- end }} + {{- include "_serviceAccountName" (merge (dict "key" "statsd") .) -}} {{- end }} {{/* Create the name of the create user job service account to use */}} {{- define "createUserJob.serviceAccountName" -}} - {{- if .Values.createUserJob.serviceAccount.create }} - {{- default (printf "%s-create-user-job" (include "airflow.serviceAccountName" .)) .Values.createUserJob.serviceAccount.name }} - {{- else }} - {{- default "default" .Values.createUserJob.serviceAccount.name }} - {{- end }} + {{- include "_serviceAccountName" (merge (dict "key" "createUserJob" "nameSuffix" "create-user-job") .) -}} {{- end }} {{/* Create the name of the migrate database job service account to use */}} {{- define "migrateDatabaseJob.serviceAccountName" -}} - {{- if .Values.migrateDatabaseJob.serviceAccount.create }} - {{- default (printf "%s-migrate-database-job" (include "airflow.serviceAccountName" .)) .Values.migrateDatabaseJob.serviceAccount.name }} - {{- else }} - {{- default "default" .Values.migrateDatabaseJob.serviceAccount.name }} - {{- end }} + {{- include "_serviceAccountName" (merge (dict "key" "migrateDatabaseJob" "nameSuffix" "migrate-database-job") .) -}} {{- end }} {{/* Create the name of the worker service account to use */}} {{- define "worker.serviceAccountName" -}} - {{- if .Values.workers.serviceAccount.create }} - {{- default (printf "%s-worker" (include "airflow.serviceAccountName" .)) .Values.workers.serviceAccount.name }} - {{- else }} - {{- default "default" .Values.workers.serviceAccount.name }} - {{- end }} + {{- include "_serviceAccountName" (merge (dict "key" "workers" "nameSuffix" "worker") .) -}} {{- end }} {{/* Create the name of the triggerer service account to use */}} {{- define "triggerer.serviceAccountName" -}} - {{- if .Values.triggerer.serviceAccount.create }} - {{- default (printf "%s-triggerer" (include "airflow.serviceAccountName" .)) .Values.triggerer.serviceAccount.name }} - {{- else }} - {{- default "default" .Values.triggerer.serviceAccount.name }} - {{- end }} + {{- include "_serviceAccountName" (merge (dict "key" "triggerer") .) -}} {{- end }} {{/* Create the name of the dag processor service account to use */}} {{- define "dagProcessor.serviceAccountName" -}} - {{- if .Values.dagProcessor.serviceAccount.create }} - {{- default (printf "%s-dag-processor" (include "airflow.serviceAccountName" .)) .Values.dagProcessor.serviceAccount.name }} - {{- else }} - {{- default "default" .Values.dagProcessor.serviceAccount.name }} - {{- end }} + {{- include "_serviceAccountName" (merge (dict "key" "dagProcessor" "nameSuffix" "dag-processor") .) -}} {{- end }} {{/* Create the name of the pgbouncer service account to use */}} {{- define "pgbouncer.serviceAccountName" -}} - {{- if .Values.pgbouncer.serviceAccount.create }} - {{- default (printf "%s-pgbouncer" (include "airflow.serviceAccountName" .)) .Values.pgbouncer.serviceAccount.name }} - {{- else }} - {{- default "default" .Values.pgbouncer.serviceAccount.name }} - {{- end }} + {{- include "_serviceAccountName" (merge (dict "key" "pgbouncer") .) -}} {{- end }} {{/* Create the name of the cleanup service account to use */}} {{- define "cleanup.serviceAccountName" -}} - {{- if .Values.cleanup.serviceAccount.create }} - {{- default (printf "%s-cleanup" (include "airflow.serviceAccountName" .)) .Values.cleanup.serviceAccount.name }} - {{- else }} - {{- default "default" .Values.cleanup.serviceAccount.name }} - {{- end }} + {{- include "_serviceAccountName" (merge (dict "key" "cleanup") .) -}} {{- end }} {{- define "wait-for-migrations-command" -}} diff --git a/chart/templates/rpc-server/rpc-server-deployment.yaml b/chart/templates/api-server/api-server-deployment.yaml similarity index 54% rename from chart/templates/rpc-server/rpc-server-deployment.yaml rename to chart/templates/api-server/api-server-deployment.yaml index 4a3766c585322..b4cf2cd4461f1 100644 --- a/chart/templates/rpc-server/rpc-server-deployment.yaml +++ b/chart/templates/api-server/api-server-deployment.yaml @@ -18,44 +18,43 @@ */}} ################################ -## Airflow rpc-server Deployment +## Airflow API Server Deployment ################################# -{{- if .Values._rpcServer.enabled }} -{{- $nodeSelector := or .Values._rpcServer.nodeSelector .Values.nodeSelector }} -{{- $affinity := or .Values._rpcServer.affinity .Values.affinity }} -{{- $tolerations := or .Values._rpcServer.tolerations .Values.tolerations }} -{{- $topologySpreadConstraints := or .Values._rpcServer.topologySpreadConstraints .Values.topologySpreadConstraints }} -{{- $revisionHistoryLimit := or .Values._rpcServer.revisionHistoryLimit .Values.revisionHistoryLimit }} -{{- $securityContext := include "airflowPodSecurityContext" (list . .Values._rpcServer) }} -{{- $containerSecurityContext := include "containerSecurityContext" (list . .Values._rpcServer) }} -{{- $containerSecurityContextWaitForMigrations := include "containerSecurityContext" (list . .Values._rpcServer.waitForMigrations) }} -{{- $containerLifecycleHooks := or .Values._rpcServer.containerLifecycleHooks .Values.containerLifecycleHooks }} +{{- if semverCompare ">=3.0.0" .Values.airflowVersion }} +{{- $nodeSelector := or .Values.apiServer.nodeSelector .Values.nodeSelector }} +{{- $affinity := or .Values.apiServer.affinity .Values.affinity }} +{{- $tolerations := or .Values.apiServer.tolerations .Values.tolerations }} +{{- $topologySpreadConstraints := or .Values.apiServer.topologySpreadConstraints .Values.topologySpreadConstraints }} +{{- $revisionHistoryLimit := or .Values.apiServer.revisionHistoryLimit .Values.revisionHistoryLimit }} +{{- $securityContext := include "airflowPodSecurityContext" (list . .Values.apiServer) }} +{{- $containerSecurityContext := include "containerSecurityContext" (list . .Values.apiServer) }} +{{- $containerSecurityContextWaitForMigrations := include "containerSecurityContext" (list . .Values.apiServer.waitForMigrations) }} +{{- $containerLifecycleHooks := or .Values.apiServer.containerLifecycleHooks .Values.containerLifecycleHooks }} apiVersion: apps/v1 kind: Deployment metadata: - name: {{ include "airflow.fullname" . }}-rpc-server + name: {{ include "airflow.fullname" . }}-api-server labels: tier: airflow - component: rpc-server + component: api-server release: {{ .Release.Name }} chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" heritage: {{ .Release.Service }} {{- with .Values.labels }} {{- toYaml . | nindent 4 }} {{- end }} - {{- if .Values._rpcServer.annotations }} - annotations: {{- toYaml .Values._rpcServer.annotations | nindent 4 }} + {{- if .Values.apiServer.annotations }} + annotations: {{- toYaml .Values.apiServer.annotations | nindent 4 }} {{- end }} spec: - replicas: {{ .Values._rpcServer.replicas }} + replicas: {{ .Values.apiServer.replicas }} {{- if $revisionHistoryLimit }} revisionHistoryLimit: {{ $revisionHistoryLimit }} {{- end }} strategy: - {{- if .Values._rpcServer.strategy }} - {{- toYaml .Values._rpcServer.strategy | nindent 4 }} + {{- if .Values.apiServer.strategy }} + {{- toYaml .Values.apiServer.strategy | nindent 4 }} {{- else }} - {{- if semverCompare ">=2.0.0" .Values.airflowVersion }} # Here we define the rolling update strategy # - maxSurge define how many pod we can add at a time # - maxUnavailable define how many pod can be unavailable @@ -67,23 +66,20 @@ spec: rollingUpdate: maxSurge: 1 maxUnavailable: 0 - {{- else }} - type: Recreate - {{- end }} {{- end }} selector: matchLabels: tier: airflow - component: rpc-server + component: api-server release: {{ .Release.Name }} template: metadata: labels: tier: airflow - component: rpc-server + component: api-server release: {{ .Release.Name }} - {{- if or (.Values.labels) (.Values._rpcServer.labels) }} - {{- mustMerge .Values._rpcServer.labels .Values.labels | toYaml | nindent 8 }} + {{- if or (.Values.labels) (.Values.apiServer.labels) }} + {{- mustMerge .Values.apiServer.labels .Values.labels | toYaml | nindent 8 }} {{- end }} annotations: checksum/metadata-secret: {{ include (print $.Template.BasePath "/secrets/metadata-connection-secret.yaml") . | sha256sum }} @@ -94,16 +90,16 @@ spec: {{- if .Values.airflowPodAnnotations }} {{- toYaml .Values.airflowPodAnnotations | nindent 8 }} {{- end }} - {{- if .Values._rpcServer.podAnnotations }} - {{- toYaml .Values._rpcServer.podAnnotations | nindent 8 }} + {{- if .Values.apiServer.podAnnotations }} + {{- toYaml .Values.apiServer.podAnnotations | nindent 8 }} {{- end }} spec: - {{- if .Values._rpcServer.hostAliases }} - hostAliases: {{- toYaml .Values._rpcServer.hostAliases | nindent 8 }} + {{- if .Values.apiServer.hostAliases }} + hostAliases: {{- toYaml .Values.apiServer.hostAliases | nindent 8 }} {{- end }} - serviceAccountName: {{ include "rpcServer.serviceAccountName" . }} - {{- if .Values._rpcServer.priorityClassName }} - priorityClassName: {{ .Values._rpcServer.priorityClassName }} + serviceAccountName: {{ include "apiServer.serviceAccountName" . }} + {{- if .Values.apiServer.priorityClassName }} + priorityClassName: {{ .Values.apiServer.priorityClassName }} {{- end }} {{- if .Values.schedulerName }} schedulerName: {{ .Values.schedulerName }} @@ -118,7 +114,7 @@ spec: - podAffinityTerm: labelSelector: matchLabels: - component: rpc-server + component: api-server topologyKey: kubernetes.io/hostname weight: 100 {{- end }} @@ -131,9 +127,9 @@ spec: - name: {{ template "registry_secret" . }} {{- end }} initContainers: - {{- if .Values._rpcServer.waitForMigrations.enabled }} + {{- if .Values.apiServer.waitForMigrations.enabled }} - name: wait-for-airflow-migrations - resources: {{- toYaml .Values._rpcServer.resources | nindent 12 }} + resources: {{- toYaml .Values.apiServer.resources | nindent 12 }} image: {{ template "airflow_image_for_migrations" . }} imagePullPolicy: {{ .Values.images.airflow.pullPolicy }} securityContext: {{ $containerSecurityContextWaitForMigrations | nindent 12 }} @@ -142,36 +138,36 @@ spec: {{- if .Values.volumeMounts }} {{- toYaml .Values.volumeMounts | nindent 12 }} {{- end }} - {{- if .Values._rpcServer.extraVolumeMounts }} - {{- tpl (toYaml .Values._rpcServer.extraVolumeMounts) . | nindent 12 }} + {{- if .Values.apiServer.extraVolumeMounts }} + {{- tpl (toYaml .Values.apiServer.extraVolumeMounts) . | nindent 12 }} {{- end }} args: {{- include "wait-for-migrations-command" . | indent 10 }} envFrom: {{- include "custom_airflow_environment_from" . | default "\n []" | indent 10 }} env: {{- include "custom_airflow_environment" . | indent 10 }} {{- include "standard_airflow_environment" . | indent 10 }} - {{- if .Values._rpcServer.waitForMigrations.env }} - {{- tpl (toYaml .Values._rpcServer.waitForMigrations.env) $ | nindent 12 }} + {{- if .Values.apiServer.waitForMigrations.env }} + {{- tpl (toYaml .Values.apiServer.waitForMigrations.env) $ | nindent 12 }} {{- end }} {{- end }} - {{- if .Values._rpcServer.extraInitContainers }} - {{- toYaml .Values._rpcServer.extraInitContainers | nindent 8 }} + {{- if .Values.apiServer.extraInitContainers }} + {{- toYaml .Values.apiServer.extraInitContainers | nindent 8 }} {{- end }} containers: - - name: rpc-server + - name: api-server image: {{ template "airflow_image" . }} imagePullPolicy: {{ .Values.images.airflow.pullPolicy }} securityContext: {{ $containerSecurityContext | nindent 12 }} {{- if $containerLifecycleHooks }} lifecycle: {{- tpl (toYaml $containerLifecycleHooks) . | nindent 12 }} {{- end }} - {{- if .Values._rpcServer.command }} - command: {{ tpl (toYaml .Values._rpcServer.command) . | nindent 12 }} + {{- if .Values.apiServer.command }} + command: {{ tpl (toYaml .Values.apiServer.command) . | nindent 12 }} {{- end }} - {{- if .Values._rpcServer.args }} - args: {{- tpl (toYaml .Values._rpcServer.args) . | nindent 12 }} + {{- if .Values.apiServer.args }} + args: {{- tpl (toYaml .Values.apiServer.args) . | nindent 12 }} {{- end }} - resources: {{- toYaml .Values._rpcServer.resources | nindent 12 }} + resources: {{- toYaml .Values.apiServer.resources | nindent 12 }} volumeMounts: {{- include "airflow_config_mount" . | nindent 12 }} {{- if .Values.logs.persistence.enabled }} @@ -181,63 +177,48 @@ spec: {{- if .Values.volumeMounts }} {{- toYaml .Values.volumeMounts | nindent 12 }} {{- end }} - {{- if .Values._rpcServer.extraVolumeMounts }} - {{- tpl (toYaml .Values._rpcServer.extraVolumeMounts) . | nindent 12 }} + {{- if .Values.apiServer.extraVolumeMounts }} + {{- tpl (toYaml .Values.apiServer.extraVolumeMounts) . | nindent 12 }} {{- end }} ports: - - name: rpc-server - containerPort: {{ .Values.ports._rpcServer }} + - name: api-server + containerPort: {{ .Values.ports.apiServer }} livenessProbe: httpGet: - path: {{ if .Values.config.core.internal_api_url }}{{- with urlParse (tpl .Values.config.core.internal_api_url .) }}{{ .path }}{{ end }}{{ end }}/internal_api/v1/health - port: {{ .Values.ports._rpcServer }} - {{- if .Values.config.core.internal_api_url}} - httpHeaders: - - name: Host - value: {{ regexReplaceAll ":\\d+$" (urlParse (tpl .Values.config.core.internal_api_url .)).host "" }} - {{- end }} - scheme: {{ .Values._rpcServer.livenessProbe.scheme | default "http" }} - initialDelaySeconds: {{ .Values._rpcServer.livenessProbe.initialDelaySeconds }} - timeoutSeconds: {{ .Values._rpcServer.livenessProbe.timeoutSeconds }} - failureThreshold: {{ .Values._rpcServer.livenessProbe.failureThreshold }} - periodSeconds: {{ .Values._rpcServer.livenessProbe.periodSeconds }} + path: /public/version + port: {{ .Values.ports.apiServer }} + scheme: {{ .Values.apiServer.livenessProbe.scheme | default "http" }} + initialDelaySeconds: {{ .Values.apiServer.livenessProbe.initialDelaySeconds }} + timeoutSeconds: {{ .Values.apiServer.livenessProbe.timeoutSeconds }} + failureThreshold: {{ .Values.apiServer.livenessProbe.failureThreshold }} + periodSeconds: {{ .Values.apiServer.livenessProbe.periodSeconds }} readinessProbe: httpGet: - path: {{ if .Values.config.core.internal_api_url }}{{- with urlParse (tpl .Values.config.core.internal_api_url .) }}{{ .path }}{{ end }}{{ end }}/internal_api/v1/health - port: {{ .Values.ports._rpcServer }} - {{- if .Values.config.core.internal_api_url }} - httpHeaders: - - name: Host - value: {{ regexReplaceAll ":\\d+$" (urlParse (tpl .Values.config.core.internal_api_url .)).host "" }} - {{- end }} - scheme: {{ .Values._rpcServer.readinessProbe.scheme | default "http" }} - initialDelaySeconds: {{ .Values._rpcServer.readinessProbe.initialDelaySeconds }} - timeoutSeconds: {{ .Values._rpcServer.readinessProbe.timeoutSeconds }} - failureThreshold: {{ .Values._rpcServer.readinessProbe.failureThreshold }} - periodSeconds: {{ .Values._rpcServer.readinessProbe.periodSeconds }} + path: /public/version + port: {{ .Values.ports.apiServer }} + scheme: {{ .Values.apiServer.readinessProbe.scheme | default "http" }} + initialDelaySeconds: {{ .Values.apiServer.readinessProbe.initialDelaySeconds }} + timeoutSeconds: {{ .Values.apiServer.readinessProbe.timeoutSeconds }} + failureThreshold: {{ .Values.apiServer.readinessProbe.failureThreshold }} + periodSeconds: {{ .Values.apiServer.readinessProbe.periodSeconds }} startupProbe: httpGet: - path: {{ if .Values.config.core.internal_api_url }}{{- with urlParse (tpl .Values.config.core.internal_api_url .) }}{{ .path }}{{ end }}{{ end }}/internal_api/v1/health - port: {{ .Values.ports._rpcServer }} - {{- if .Values.config.core.internal_api_url}} - httpHeaders: - - name: Host - value: {{ regexReplaceAll ":\\d+$" (urlParse (tpl .Values.config.core.internal_api_url .)).host "" }} - {{- end }} - scheme: {{ .Values._rpcServer.startupProbe.scheme | default "http" }} - timeoutSeconds: {{ .Values._rpcServer.startupProbe.timeoutSeconds }} - failureThreshold: {{ .Values._rpcServer.startupProbe.failureThreshold }} - periodSeconds: {{ .Values._rpcServer.startupProbe.periodSeconds }} + path: /public/version + port: {{ .Values.ports.apiServer }} + scheme: {{ .Values.apiServer.startupProbe.scheme | default "http" }} + timeoutSeconds: {{ .Values.apiServer.startupProbe.timeoutSeconds }} + failureThreshold: {{ .Values.apiServer.startupProbe.failureThreshold }} + periodSeconds: {{ .Values.apiServer.startupProbe.periodSeconds }} envFrom: {{- include "custom_airflow_environment_from" . | default "\n []" | indent 10 }} env: {{- include "custom_airflow_environment" . | indent 10 }} {{- include "standard_airflow_environment" . | indent 10 }} - {{- include "container_extra_envs" (list . .Values._rpcServer.env) | indent 10 }} + {{- include "container_extra_envs" (list . .Values.apiServer.env) | indent 10 }} {{- if and (.Values.dags.gitSync.enabled) (not .Values.dags.persistence.enabled) (semverCompare "<2.0.0" .Values.airflowVersion) }} {{- include "git_sync_container" . | nindent 8 }} {{- end }} - {{- if .Values._rpcServer.extraContainers }} - {{- tpl (toYaml .Values._rpcServer.extraContainers) . | nindent 8 }} + {{- if .Values.apiServer.extraContainers }} + {{- tpl (toYaml .Values.apiServer.extraContainers) . | nindent 8 }} {{- end }} volumes: - name: config @@ -253,7 +234,7 @@ spec: {{- if .Values.volumes }} {{- toYaml .Values.volumes | nindent 8 }} {{- end }} - {{- if .Values._rpcServer.extraVolumes }} - {{- tpl (toYaml .Values._rpcServer.extraVolumes) . | nindent 8 }} + {{- if .Values.apiServer.extraVolumes }} + {{- tpl (toYaml .Values.apiServer.extraVolumes) . | nindent 8 }} {{- end }} {{- end }} diff --git a/chart/templates/rpc-server/rpc-server-networkpolicy.yaml b/chart/templates/api-server/api-server-networkpolicy.yaml similarity index 71% rename from chart/templates/rpc-server/rpc-server-networkpolicy.yaml rename to chart/templates/api-server/api-server-networkpolicy.yaml index ce3453429450b..af4601811200e 100644 --- a/chart/templates/rpc-server/rpc-server-networkpolicy.yaml +++ b/chart/templates/api-server/api-server-networkpolicy.yaml @@ -18,36 +18,36 @@ */}} ################################ -## Airflow rpc-server NetworkPolicy +## Airflow API server NetworkPolicy ################################# -{{- if .Values._rpcServer.enabled }} +{{- if semverCompare ">=3.0.0" .Values.airflowVersion }} {{- if .Values.networkPolicies.enabled }} apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: - name: {{ include "airflow.fullname" . }}-rpc-server-policy + name: {{ include "airflow.fullname" . }}-api-server-policy labels: tier: airflow - component: airflow-rpc-server-policy + component: airflow-api-server-policy release: {{ .Release.Name }} chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" heritage: {{ .Release.Service }} - {{- if or (.Values.labels) (.Values._rpcServer.labels) }} - {{- mustMerge .Values._rpcServer.labels .Values.labels | toYaml | nindent 4 }} + {{- if or (.Values.labels) (.Values.apiServer.labels) }} + {{- mustMerge .Values.apiServer.labels .Values.labels | toYaml | nindent 4 }} {{- end }} spec: podSelector: matchLabels: tier: airflow - component: rpc-server + component: api-server release: {{ .Release.Name }} policyTypes: - Ingress - {{- if .Values._rpcServer.networkPolicy.ingress.from }} + {{- if .Values.apiServer.networkPolicy.ingress.from }} ingress: - - from: {{- toYaml .Values._rpcServer.networkPolicy.ingress.from | nindent 6 }} + - from: {{- toYaml .Values.apiServer.networkPolicy.ingress.from | nindent 6 }} ports: - {{ range .Values._rpcServer.networkPolicy.ingress.ports }} + {{ range .Values.apiServer.networkPolicy.ingress.ports }} - {{- range $key, $val := . }} {{ $key }}: {{ tpl (toString $val) $ }} diff --git a/chart/templates/rpc-server/rpc-server-poddisruptionbudget.yaml b/chart/templates/api-server/api-server-poddisruptionbudget.yaml similarity index 70% rename from chart/templates/rpc-server/rpc-server-poddisruptionbudget.yaml rename to chart/templates/api-server/api-server-poddisruptionbudget.yaml index e6d4afebb2da0..7d0b162e41ea9 100644 --- a/chart/templates/rpc-server/rpc-server-poddisruptionbudget.yaml +++ b/chart/templates/api-server/api-server-poddisruptionbudget.yaml @@ -18,29 +18,29 @@ */}} ################################ -## Airflow rpc-server PodDisruptionBudget +## Airflow api-server PodDisruptionBudget ################################# -{{- if .Values._rpcServer.enabled }} -{{- if .Values._rpcServer.podDisruptionBudget.enabled }} +{{- if semverCompare ">=3.0.0" .Values.airflowVersion }} +{{- if .Values.apiServer.podDisruptionBudget.enabled }} apiVersion: policy/v1 kind: PodDisruptionBudget metadata: - name: {{ include "airflow.fullname" . }}-rpc-server-pdb + name: {{ include "airflow.fullname" . }}-api-server-pdb labels: tier: airflow - component: rpc-server + component: api-server release: {{ .Release.Name }} chart: {{ .Chart.Name }} heritage: {{ .Release.Service }} - {{- if or (.Values.labels) (.Values._rpcServer.labels) }} - {{- mustMerge .Values._rpcServer.labels .Values.labels | toYaml | nindent 4 }} + {{- if or (.Values.labels) (.Values.apiServer.labels) }} + {{- mustMerge .Values.apiServer.labels .Values.labels | toYaml | nindent 4 }} {{- end }} spec: selector: matchLabels: tier: airflow - component: rpc-server + component: api-server release: {{ .Release.Name }} - {{- toYaml .Values._rpcServer.podDisruptionBudget.config | nindent 2 }} + {{- toYaml .Values.apiServer.podDisruptionBudget.config | nindent 2 }} {{- end }} {{- end }} diff --git a/chart/templates/rpc-server/rpc-server-service.yaml b/chart/templates/api-server/api-server-service.yaml similarity index 63% rename from chart/templates/rpc-server/rpc-server-service.yaml rename to chart/templates/api-server/api-server-service.yaml index f6b9160d2fb48..0a360aee08539 100644 --- a/chart/templates/rpc-server/rpc-server-service.yaml +++ b/chart/templates/api-server/api-server-service.yaml @@ -18,42 +18,42 @@ */}} ################################ -## Airflow rpc-server Service +## Airflow api-server Service ################################# -{{- if .Values._rpcServer.enabled }} +{{- if semverCompare ">=3.0.0" .Values.airflowVersion }} apiVersion: v1 kind: Service metadata: - name: {{ include "airflow.fullname" . }}-rpc-server + name: {{ include "airflow.fullname" . }}-api-server labels: tier: airflow - component: rpc-server + component: api-server release: {{ .Release.Name }} chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" heritage: {{ .Release.Service }} - {{- if or (.Values.labels) (.Values._rpcServer.labels) }} - {{- mustMerge .Values._rpcServer.labels .Values.labels | toYaml | nindent 4 }} + {{- if or (.Values.labels) (.Values.apiServer.labels) }} + {{- mustMerge .Values.apiServer.labels .Values.labels | toYaml | nindent 4 }} {{- end }} - {{- with .Values._rpcServer.service.annotations }} + {{- with .Values.apiServer.service.annotations }} annotations: {{- toYaml . | nindent 4 }} {{- end }} spec: - type: {{ .Values._rpcServer.service.type }} + type: {{ .Values.apiServer.service.type }} selector: tier: airflow - component: rpc-server + component: api-server release: {{ .Release.Name }} ports: - {{ range .Values._rpcServer.service.ports }} + {{ range .Values.apiServer.service.ports }} - {{- range $key, $val := . }} {{ $key }}: {{ tpl (toString $val) $ }} {{- end }} {{- end }} - {{- if .Values._rpcServer.service.loadBalancerIP }} - loadBalancerIP: {{ .Values._rpcServer.service.loadBalancerIP }} + {{- if .Values.apiServer.service.loadBalancerIP }} + loadBalancerIP: {{ .Values.apiServer.service.loadBalancerIP }} {{- end }} - {{- if .Values._rpcServer.service.loadBalancerSourceRanges }} - loadBalancerSourceRanges: {{- toYaml .Values._rpcServer.service.loadBalancerSourceRanges | nindent 4 }} + {{- if .Values.apiServer.service.loadBalancerSourceRanges }} + loadBalancerSourceRanges: {{- toYaml .Values.apiServer.service.loadBalancerSourceRanges | nindent 4 }} {{- end }} {{- end }} diff --git a/chart/templates/rpc-server/rpc-server-serviceaccount.yaml b/chart/templates/api-server/api-server-serviceaccount.yaml similarity index 68% rename from chart/templates/rpc-server/rpc-server-serviceaccount.yaml rename to chart/templates/api-server/api-server-serviceaccount.yaml index 655b346fbe0ab..3b864d01602fa 100644 --- a/chart/templates/rpc-server/rpc-server-serviceaccount.yaml +++ b/chart/templates/api-server/api-server-serviceaccount.yaml @@ -18,24 +18,24 @@ */}} ###################################### -## Airflow rpc-server ServiceAccount +## Airflow api-server ServiceAccount ###################################### -{{- if and .Values._rpcServer.enabled .Values._rpcServer.serviceAccount.create }} +{{- if and .Values.apiServer.serviceAccount.create (semverCompare ">=3.0.0" .Values.airflowVersion) }} apiVersion: v1 kind: ServiceAccount -automountServiceAccountToken: {{ .Values._rpcServer.serviceAccount.automountServiceAccountToken }} +automountServiceAccountToken: {{ .Values.apiServer.serviceAccount.automountServiceAccountToken }} metadata: - name: {{ include "rpcServer.serviceAccountName" . }} + name: {{ include "apiServer.serviceAccountName" . }} labels: tier: airflow - component: rpc-server + component: api-server release: {{ .Release.Name }} chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" heritage: {{ .Release.Service }} - {{- if or (.Values.labels) (.Values._rpcServer.labels) }} - {{- mustMerge .Values._rpcServer.labels .Values.labels | toYaml | nindent 4 }} + {{- if or (.Values.labels) (.Values.apiServer.labels) }} + {{- mustMerge .Values.apiServer.labels .Values.labels | toYaml | nindent 4 }} {{- end }} - {{- with .Values._rpcServer.serviceAccount.annotations }} + {{- with .Values.apiServer.serviceAccount.annotations }} annotations: {{- toYaml . | nindent 4 }} {{- end }} {{- end }} diff --git a/chart/templates/configmaps/configmap.yaml b/chart/templates/configmaps/configmap.yaml index cb8139cc6eeb6..119e43cce0a5e 100644 --- a/chart/templates/configmaps/configmap.yaml +++ b/chart/templates/configmaps/configmap.yaml @@ -38,6 +38,13 @@ metadata: {{- end }} {{- $Global := . }} data: + {{/*- Set a default for workers.execution_api_server_url pointing to the api-server service if it's not set -*/}} + {{- if semverCompare ">=3.0.0" .Values.airflowVersion -}} + {{- $config := merge .Values.config ( dict "workers" dict )}} + {{- if not (hasKey $config.workers "execution_api_server_url") -}} + {{- $_ := set $config.workers "execution_api_server_url" (printf "http://%s-api-server:%d/execution/" (include "airflow.fullname" .) (int .Values.ports.apiServer)) -}} + {{- end -}} + {{- end -}} # These are system-specified config overrides. airflow.cfg: |- {{- range $section, $settings := .Values.config }} diff --git a/chart/values.schema.json b/chart/values.schema.json index 6bfa5fe8b5e7b..813a08e7170fb 100644 --- a/chart/values.schema.json +++ b/chart/values.schema.json @@ -9,7 +9,7 @@ "Ports", "Database", "PgBouncer", - "RPC Server", + "API Server", "Scheduler", "Webserver", "Workers", @@ -4668,19 +4668,14 @@ } } }, - "_rpcServer": { - "description": "Airflow RPC server settings (AIP-44). Experimental / for dev purpose only.", + "apiServer": { + "description": "Airflow API server settings.", "type": "object", - "x-docsSection": "RPC Server", + "x-docsSection": "API Server", "additionalProperties": false, "properties": { - "enabled": { - "description": "Enable RPC server", - "type": "boolean", - "default": false - }, "configMapAnnotations": { - "description": "Extra annotations to apply to the RPC server configmap.", + "description": "Extra annotations to apply to the API server configmap.", "type": "object", "default": {}, "additionalProperties": { @@ -4688,7 +4683,7 @@ } }, "hostAliases": { - "description": "HostAliases for the RPC server pod.", + "description": "HostAliases for the API server pod.", "items": { "$ref": "#/definitions/io.k8s.api.core.v1.HostAlias" }, @@ -4710,7 +4705,7 @@ ] }, "allowPodLogReading": { - "description": "Allow RPC server to read k8s pod logs. Useful when you don't have an external log store.", + "description": "Allow API server to read k8s pod logs. Useful when you don't have an external log store.", "type": "boolean", "default": true }, @@ -4720,27 +4715,27 @@ "additionalProperties": false, "properties": { "initialDelaySeconds": { - "description": "RPC server Liveness probe initial delay.", + "description": "API server Liveness probe initial delay.", "type": "integer", "default": 15 }, "timeoutSeconds": { - "description": "RPC server Liveness probe timeout seconds.", + "description": "API server Liveness probe timeout seconds.", "type": "integer", "default": 5 }, "failureThreshold": { - "description": "RPC server Liveness probe failure threshold.", + "description": "API server Liveness probe failure threshold.", "type": "integer", "default": 5 }, "periodSeconds": { - "description": "RPC server Liveness probe period seconds.", + "description": "API server Liveness probe period seconds.", "type": "integer", "default": 10 }, "scheme": { - "description": "RPC server Liveness probe scheme.", + "description": "API server Liveness probe scheme.", "type": "string", "default": "HTTP" } @@ -4752,27 +4747,27 @@ "additionalProperties": false, "properties": { "initialDelaySeconds": { - "description": "RPC server Readiness probe initial delay.", + "description": "API server Readiness probe initial delay.", "type": "integer", "default": 15 }, "timeoutSeconds": { - "description": "RPC server Readiness probe timeout seconds.", + "description": "API server Readiness probe timeout seconds.", "type": "integer", "default": 5 }, "failureThreshold": { - "description": "RPC server Readiness probe failure threshold.", + "description": "API server Readiness probe failure threshold.", "type": "integer", "default": 5 }, "periodSeconds": { - "description": "RPC server Readiness probe period seconds.", + "description": "API server Readiness probe period seconds.", "type": "integer", "default": 10 }, "scheme": { - "description": "RPC server Readiness probe scheme.", + "description": "API server Readiness probe scheme.", "type": "string", "default": "HTTP" } @@ -4784,29 +4779,29 @@ "additionalProperties": false, "properties": { "timeoutSeconds": { - "description": "RPC server Startup probe timeout seconds.", + "description": "API server Startup probe timeout seconds.", "type": "integer", "default": 20 }, "failureThreshold": { - "description": "RPC server Startup probe failure threshold.", + "description": "API server Startup probe failure threshold.", "type": "integer", "default": 6 }, "periodSeconds": { - "description": "RPC server Startup probe period seconds.", + "description": "API server Startup probe period seconds.", "type": "integer", "default": 10 }, "scheme": { - "description": "RPC server Startup probe scheme.", + "description": "API server Startup probe scheme.", "type": "string", "default": "HTTP" } } }, "replicas": { - "description": "How many Airflow RPC server replicas should run.", + "description": "How many Airflow API server replicas should run.", "type": "integer", "default": 1 }, @@ -4820,7 +4815,7 @@ "x-docsSection": null }, "command": { - "description": "Command to use when running the Airflow RPC server (templated).", + "description": "Command to use when running the Airflow API server (templated).", "type": [ "array", "null" @@ -4828,12 +4823,10 @@ "items": { "type": "string" }, - "default": [ - "bash" - ] + "default": null }, "args": { - "description": "Args to use when running the Airflow RPC server (templated).", + "description": "Args to use when running the Airflow API server (templated).", "type": [ "array", "null" @@ -4842,8 +4835,9 @@ "type": "string" }, "default": [ + "bash", "-c", - "exec airflow internal-api" + "exec airflow fastapi-api" ] }, "strategy": { @@ -4877,7 +4871,7 @@ "default": null }, "annotations": { - "description": "Annotations to add to the RPC server Kubernetes ServiceAccount.", + "description": "Annotations to add to the API server Kubernetes ServiceAccount.", "type": "object", "default": {}, "additionalProperties": { @@ -4887,7 +4881,7 @@ } }, "podDisruptionBudget": { - "description": "RPC server pod disruption budget.", + "description": "API server pod disruption budget.", "type": "object", "additionalProperties": false, "properties": { @@ -4902,7 +4896,7 @@ "additionalProperties": false, "properties": { "maxUnavailable": { - "description": "Max unavailable pods for RPC server.", + "description": "Max unavailable pods for API server.", "type": [ "integer", "string" @@ -4910,7 +4904,7 @@ "default": 1 }, "minAvailable": { - "description": "Min available pods for RPC server.", + "description": "Min available pods for API server.", "type": [ "integer", "string" @@ -4921,27 +4915,18 @@ } } }, - "extraNetworkPolicies": { - "description": "Additional NetworkPolicies as needed (Deprecated - renamed to `RPC server.networkPolicy.ingress.from`).", - "type": "array", - "items": { - "type": "object", - "$ref": "#/definitions/io.k8s.api.networking.v1.NetworkPolicyPeer" - }, - "default": [] - }, "networkPolicy": { - "description": "RPC server NetworkPolicy configuration", + "description": "API server NetworkPolicy configuration", "type": "object", "additionalProperties": false, "properties": { "ingress": { - "description": "RPC server NetworkPolicyingress configuration", + "description": "API server NetworkPolicyingress configuration", "type": "object", "additionalProperties": false, "properties": { "from": { - "description": "Peers for RPC server NetworkPolicyingress.", + "description": "Peers for API server NetworkPolicyingress.", "type": "array", "default": [], "items": { @@ -4949,19 +4934,19 @@ } }, "ports": { - "description": "Ports for RPC server NetworkPolicyingress (if `from` is set).", + "description": "Ports for API server NetworkPolicyingress (if `from` is set).", "type": "array", "items": { "$ref": "#/definitions/io.k8s.api.networking.v1.NetworkPolicyPort" }, "default": [ { - "port": "{{ .Values.ports._rpcServer }}" + "port": "{{ .Values.ports.apiServer }}" } ], "examples": [ { - "port": 9080 + "port": 9091 } ] } @@ -4970,7 +4955,7 @@ } }, "containerLifecycleHooks": { - "description": "Container Lifecycle Hooks definition for the RPC server. If not set, the values from global `containerLifecycleHooks` will be used.", + "description": "Container Lifecycle Hooks definition for the API server. If not set, the values from global `containerLifecycleHooks` will be used.", "type": "object", "$ref": "#/definitions/io.k8s.api.core.v1.Lifecycle", "default": {}, @@ -4999,12 +4984,12 @@ ] }, "securityContexts": { - "description": "Security context definition for the RPC server. If not set, the values from global `securityContexts` will be used.", + "description": "Security context definition for the API server. If not set, the values from global `securityContexts` will be used.", "type": "object", "x-docsSection": "Kubernetes", "properties": { "pod": { - "description": "Pod security context definition for the RPC server.", + "description": "Pod security context definition for the API server.", "type": "object", "$ref": "#/definitions/io.k8s.api.core.v1.PodSecurityContext", "default": {}, @@ -5018,7 +5003,7 @@ ] }, "container": { - "description": "Container security context definition for the RPC server.", + "description": "Container security context definition for the API server.", "type": "object", "$ref": "#/definitions/io.k8s.api.core.v1.SecurityContext", "default": {}, @@ -5037,7 +5022,7 @@ } }, "resources": { - "description": "Resources for RPC server pods.", + "description": "Resources for API server pods.", "type": "object", "default": {}, "examples": [ @@ -5098,7 +5083,7 @@ } }, "extraContainers": { - "description": "Launch additional containers into RPC server.", + "description": "Launch additional containers into API server.", "type": "array", "default": [], "items": { @@ -5106,7 +5091,7 @@ } }, "extraInitContainers": { - "description": "Add additional init containers into RPC server.", + "description": "Add additional init containers into API server.", "type": "array", "default": [], "items": { @@ -5114,7 +5099,7 @@ } }, "extraVolumes": { - "description": "Mount additional volumes into RPC server.", + "description": "Mount additional volumes into API server.", "type": "array", "default": [], "items": { @@ -5122,49 +5107,25 @@ } }, "extraVolumeMounts": { - "description": "Mount additional volumes into RPC server.", + "description": "Mount additional volumes into API server.", "type": "array", "default": [], "items": { "$ref": "#/definitions/io.k8s.api.core.v1.VolumeMount" } }, - "RPC serverConfig": { - "description": "This string (can be templated) will be mounted into the Airflow RPC server as a custom `RPC server_config.py`. You can bake a `RPC server_config.py` in to your image instead or specify a configmap containing the RPC server_config.py.", - "type": [ - "string", - "null" - ], - "x-docsSection": "Common", - "default": null, - "examples": [ - "from airflow import configuration as conf\n\n# The SQLAlchemy connection string.\nSQLALCHEMY_DATABASE_URI = conf.get('database', 'SQL_ALCHEMY_CONN')\n\n# Flask-WTF flag for CSRF\nCSRF_ENABLED = True" - ] - }, - "RPC serverConfigConfigMapName": { - "description": "The configmap name containing the RPC server_config.py.", - "type": [ - "string", - "null" - ], - "x-docsSection": "Common", - "default": null, - "examples": [ - "my-RPC server-configmap" - ] - }, "service": { - "description": "RPC server Service configuration.", + "description": "API server Service configuration.", "type": "object", "additionalProperties": false, "properties": { "type": { - "description": "RPC server Service type.", + "description": "API server Service type.", "type": "string", "default": "ClusterIP" }, "annotations": { - "description": "Annotations for the RPC server Service.", + "description": "Annotations for the API server Service.", "type": "object", "default": {}, "additionalProperties": { @@ -5172,7 +5133,7 @@ } }, "ports": { - "description": "Ports for the RPC server Service.", + "description": "Ports for the API server Service.", "type": "array", "items": { "type": "object", @@ -5206,15 +5167,15 @@ }, "default": [ { - "name": "rpc-server", - "port": "{{ .Values.ports._rpcServer }}" + "name": "api-server", + "port": "{{ .Values.ports.apiServer }}" } ], "examples": [ { - "name": "rpc-server", - "port": 9080, - "targetPort": "rpc-server" + "name": "api-server", + "port": 9091, + "targetPort": "api-server" }, { "name": "only_sidecar", @@ -5224,7 +5185,7 @@ ] }, "loadBalancerIP": { - "description": "RPC server Service loadBalancerIP.", + "description": "API server Service loadBalancerIP.", "type": [ "string", "null" @@ -5232,7 +5193,7 @@ "default": null }, "loadBalancerSourceRanges": { - "description": "RPC server Service ``loadBalancerSourceRanges``.", + "description": "API server Service ``loadBalancerSourceRanges``.", "type": "array", "items": { "type": "string" @@ -5245,7 +5206,7 @@ } }, "nodeSelector": { - "description": "Select certain nodes for RPC server pods.", + "description": "Select certain nodes for API server pods.", "type": "object", "default": {}, "additionalProperties": { @@ -5253,7 +5214,7 @@ } }, "priorityClassName": { - "description": "Specify priority for RPC server pods.", + "description": "Specify priority for API server pods.", "type": [ "string", "null" @@ -5261,13 +5222,13 @@ "default": null }, "affinity": { - "description": "Specify scheduling constraints for RPC server pods.", + "description": "Specify scheduling constraints for API server pods.", "type": "object", "default": "See values.yaml", "$ref": "#/definitions/io.k8s.api.core.v1.Affinity" }, "tolerations": { - "description": "Specify Tolerations for RPC server pods.", + "description": "Specify Tolerations for API server pods.", "type": "array", "default": [], "items": { @@ -5276,7 +5237,7 @@ } }, "topologySpreadConstraints": { - "description": "Specify topology spread constraints for RPC server pods.", + "description": "Specify topology spread constraints for API server pods.", "type": "array", "default": [], "x-docsSection": "Kubernetes", @@ -5286,7 +5247,7 @@ } }, "annotations": { - "description": "Annotations to add to the RPC server deployment", + "description": "Annotations to add to the API server deployment", "type": "object", "default": {}, "additionalProperties": { @@ -5294,7 +5255,7 @@ } }, "podAnnotations": { - "description": "Annotations to add to the RPC server pods.", + "description": "Annotations to add to the API server pods.", "type": "object", "default": {}, "additionalProperties": { @@ -5302,7 +5263,7 @@ } }, "labels": { - "description": "Labels to add to the RPC server objects and pods.", + "description": "Labels to add to the API server objects and pods.", "type": "object", "default": {}, "additionalProperties": { @@ -5367,7 +5328,7 @@ } }, "env": { - "description": "Add additional env vars to RPC server.", + "description": "Add additional env vars to API server.", "type": "array", "default": [], "items": { @@ -8099,10 +8060,10 @@ "type": "integer", "default": 8080 }, - "_rpcServer": { - "description": "RPC server port (AIP-44). Experimental / dev purpose only.", + "apiServer": { + "description": "API server port.", "type": "integer", - "default": 9080 + "default": 9091 }, "workerLogs": { "description": "Worker logs port.", diff --git a/chart/values.yaml b/chart/values.yaml index 495fa25749cf3..89aebfeb86bdc 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -1243,18 +1243,15 @@ migrateDatabaseJob: applyCustomEnv: true env: [] -# rpcServer support is experimental / dev purpose only and will later be renamed -_rpcServer: - enabled: false +apiServer: # Labels specific to workers objects and pods labels: {} - # Command to use when running the Airflow rpc server (templated). - command: - - "bash" - # Args to use when running the Airflow rpc server (templated). - args: ["-c", "exec airflow internal-api"] + # Command to use when running the Airflow API server (templated). + command: ~ + # Args to use when running the Airflow API server (templated). + args: ["bash", "-c", "exec airflow fastapi-api"] env: [] serviceAccount: # default value is true @@ -1273,8 +1270,8 @@ _rpcServer: ## service annotations annotations: {} ports: - - name: rpc-server - port: "{{ .Values.ports._rpcServer }}" + - name: api-server + port: "{{ .Values.ports.apiServer }}" loadBalancerIP: ~ ## Limit load balancer source ips to list of CIDRs @@ -1307,15 +1304,13 @@ _rpcServer: # Launch additional containers into the flower pods. extraContainers: [] - # Additional network policies as needed (Deprecated - renamed to `webserver.networkPolicy.ingress.from`) - extraNetworkPolicies: [] networkPolicy: ingress: # Peers for webserver NetworkPolicy ingress from: [] # Ports for webserver NetworkPolicy ingress (if `from` is set) ports: - - port: "{{ .Values.ports._rpcServer }}" + - port: "{{ .Values.ports.apiServer }}" resources: {} # limits: @@ -1339,8 +1334,6 @@ _rpcServer: periodSeconds: 10 scheme: HTTP - # Wait for at most 1 minute (6*10s) for the RPC server container to startup. - # livenessProbe kicks in after the first successful startupProbe startupProbe: timeoutSeconds: 20 failureThreshold: 6 @@ -2491,8 +2484,7 @@ ports: statsdScrape: 9102 pgbouncer: 6543 pgbouncerScrape: 9127 - # rpcServer support is experimental / dev purpose only and will later be renamed - _rpcServer: 9080 + apiServer: 9091 # Define any ResourceQuotas for namespace quotas: {} diff --git a/dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py b/dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py index f97b6b573a463..2f027d5ad9767 100644 --- a/dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py @@ -998,7 +998,6 @@ def _deploy_helm_chart( get_console(output=output).print(f"[info]Copied chart sources to {tmp_chart_path}") kubectl_context = get_kubectl_cluster_name(python=python, kubernetes_version=kubernetes_version) params = BuildProdParams(python=python) - airflow_kubernetes_image_name = params.airflow_image_kubernetes helm_command = [ "helm", "upgrade" if upgrade else "install", @@ -1011,13 +1010,13 @@ def _deploy_helm_chart( "--namespace", HELM_AIRFLOW_NAMESPACE, "--set", - f"defaultAirflowRepository={airflow_kubernetes_image_name}", + f"defaultAirflowRepository={params.airflow_image_kubernetes}", "--set", "defaultAirflowTag=latest", "-v", "1", "--set", - f"images.airflow.repository={airflow_kubernetes_image_name}", + f"images.airflow.repository={params.airflow_image_kubernetes}", "--set", "images.airflow.tag=latest", "-v", @@ -1028,6 +1027,8 @@ def _deploy_helm_chart( "config.logging.logging_level=DEBUG", "--set", f"executor={executor}", + "--set", + f"airflowVersion={params.airflow_semver_version}", ] if multi_namespace_mode: helm_command.extend(["--set", "multiNamespaceMode=true"]) diff --git a/dev/breeze/src/airflow_breeze/params/build_prod_params.py b/dev/breeze/src/airflow_breeze/params/build_prod_params.py index 6fa5828b40fa5..71405680db841 100644 --- a/dev/breeze/src/airflow_breeze/params/build_prod_params.py +++ b/dev/breeze/src/airflow_breeze/params/build_prod_params.py @@ -63,6 +63,18 @@ def airflow_version(self) -> str: else: return self._get_version_with_suffix() + @property + def airflow_semver_version(self) -> str: + """The airflow version in SemVer compatible form""" + from packaging.version import Version + + pyVer = Version(self.airflow_version) + + ver = pyVer.base_version + # if (dev := pyVer.dev) is not None: + # ver += f"-dev.{dev}" + return ver + @property def image_type(self) -> str: return "PROD" diff --git a/helm_tests/airflow_aux/test_basic_helm_chart.py b/helm_tests/airflow_aux/test_basic_helm_chart.py index 7d72a3205eeb4..c9dff5476e530 100644 --- a/helm_tests/airflow_aux/test_basic_helm_chart.py +++ b/helm_tests/airflow_aux/test_basic_helm_chart.py @@ -82,9 +82,8 @@ def _get_object_count(self, version): return OBJECT_COUNT_IN_BASIC_DEPLOYMENT + 1 return OBJECT_COUNT_IN_BASIC_DEPLOYMENT - @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "default"]) + @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "3.0.0", "default"]) def test_basic_deployments(self, version): - expected_object_count_in_basic_deployment = self._get_object_count(version) k8s_objects = render_chart( "test-basic", self._get_values_with_version( @@ -140,10 +139,19 @@ def test_basic_deployments(self, version): } if version == "2.3.2": expected.add(("Secret", "test-basic-result-backend")) - if version == "default": + if version == "3.0.0": + expected.update( + ( + ("Deployment", "test-basic-api-server"), + ("Service", "test-basic-api-server"), + ("ServiceAccount", "test-basic-api-server"), + ("Service", "test-basic-triggerer"), + ) + ) + elif version == "default": expected.add(("Service", "test-basic-triggerer")) assert list_of_kind_names_tuples == expected - assert len(k8s_objects) == expected_object_count_in_basic_deployment + assert len(k8s_objects) == len(expected) for k8s_object in k8s_objects: labels = jmespath.search("metadata.labels", k8s_object) or {} if "helm.sh/chart" in labels: @@ -165,24 +173,8 @@ def test_basic_deployments_with_standard_naming(self): actual = {(x["kind"], x["metadata"]["name"]) for x in k8s_objects} assert actual == DEFAULT_OBJECTS_STD_NAMING - def test_basic_deployment_with_rpc_server(self): - extra_objects = { - ("Deployment", "test-basic-airflow-rpc-server"), - ("Service", "test-basic-airflow-rpc-server"), - ("ServiceAccount", "test-basic-airflow-rpc-server"), - } - k8s_objects = render_chart( - "test-basic", - values={"_rpcServer": {"enabled": True}, "useStandardNaming": True}, - ) - actual = {(x["kind"], x["metadata"]["name"]) for x in k8s_objects} - assert actual == (DEFAULT_OBJECTS_STD_NAMING | extra_objects) - @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "default"]) def test_basic_deployment_with_standalone_dag_processor(self, version): - # Dag Processor creates two extra objects compared to the basic deployment - object_count_in_basic_deployment = self._get_object_count(version) - expected_object_count_with_standalone_scheduler = object_count_in_basic_deployment + 2 k8s_objects = render_chart( "test-basic", self._get_values_with_version( @@ -244,7 +236,7 @@ def test_basic_deployment_with_standalone_dag_processor(self, version): if version == "default": expected.add(("Service", "test-basic-triggerer")) assert list_of_kind_names_tuples == expected - assert expected_object_count_with_standalone_scheduler == len(k8s_objects) + assert len(k8s_objects) == len(expected) for k8s_object in k8s_objects: labels = jmespath.search("metadata.labels", k8s_object) or {} if "helm.sh/chart" in labels: @@ -522,11 +514,7 @@ def get_k8s_objs_with_image(obj: list[Any] | dict[str, Any]) -> list[dict[str, A for obj in objs_with_image: image: str = obj["image"] if image.startswith(image_repo): - # Make sure that a command is not specified - if obj["name"] == "rpc-server": - assert obj["command"] == ["bash"] - else: - assert "command" not in obj + assert "command" not in obj @pytest.mark.parametrize( "executor", @@ -704,6 +692,6 @@ def test_redis_broker_connection_url_use_standard_naming(self): @staticmethod def default_trigger_obj(version): - if version == "default": + if version in {"default", "3.0.0"}: return "StatefulSet" return "Deployment" diff --git a/helm_tests/airflow_core/test_rpc_server.py b/helm_tests/airflow_core/test_api_server.py similarity index 65% rename from helm_tests/airflow_core/test_rpc_server.py rename to helm_tests/airflow_core/test_api_server.py index 7bbc944cfd6c9..91e6418f1db5e 100644 --- a/helm_tests/airflow_core/test_rpc_server.py +++ b/helm_tests/airflow_core/test_api_server.py @@ -21,162 +21,46 @@ import jmespath import pytest -from tests.charts.helm_template_generator import render_chart +from tests.charts.helm_template_generator import render_chart as _render_chart -class TestRPCServerDeployment: - """Tests rpc-server deployment.""" +# Everything in here needcs to set airflowVersion to get the API server to render +def render_chart(values=None, **kwargs): + values = values or {} + values.setdefault("airflowVersion", "3.0.0") + return _render_chart(values=values, **kwargs) - def test_is_disabled_by_default(self): - """ - RPC server should be disabled by default. - """ - docs = render_chart( - values={"_rpcServer": {}}, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], - ) - - assert len(docs) == 0 - - def test_should_add_host_header_to_liveness_and_readiness_and_startup_probes(self): - docs = render_chart( - values={ - "_rpcServer": {"enabled": True}, - "config": {"core": {"internal_api_url": "https://example.com:21222/mypath/path"}}, - }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], - ) - - assert {"name": "Host", "value": "example.com"} in jmespath.search( - "spec.template.spec.containers[0].livenessProbe.httpGet.httpHeaders", docs[0] - ) - assert {"name": "Host", "value": "example.com"} in jmespath.search( - "spec.template.spec.containers[0].readinessProbe.httpGet.httpHeaders", docs[0] - ) - assert {"name": "Host", "value": "example.com"} in jmespath.search( - "spec.template.spec.containers[0].startupProbe.httpGet.httpHeaders", docs[0] - ) - def test_should_add_path_to_liveness_and_readiness_and_startup_probes(self): - docs = render_chart( - values={ - "_rpcServer": { - "enabled": True, - }, - "config": {"core": {"internal_api_url": "https://example.com:21222/mypath/path"}}, - }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], - ) - - assert ( - jmespath.search("spec.template.spec.containers[0].livenessProbe.httpGet.path", docs[0]) - == "/mypath/path/internal_api/v1/health" - ) - assert ( - jmespath.search("spec.template.spec.containers[0].readinessProbe.httpGet.path", docs[0]) - == "/mypath/path/internal_api/v1/health" - ) - assert ( - jmespath.search("spec.template.spec.containers[0].startupProbe.httpGet.path", docs[0]) - == "/mypath/path/internal_api/v1/health" - ) +class TestAPIServerDeployment: + """Tests api-server deployment.""" @pytest.mark.parametrize( "revision_history_limit, global_revision_history_limit", [(8, 10), (10, 8), (8, None), (None, 10), (None, None)], ) def test_revision_history_limit(self, revision_history_limit, global_revision_history_limit): - values = { - "_rpcServer": { - "enabled": True, - } - } + values = {"apiServer": {}} if revision_history_limit: - values["_rpcServer"]["revisionHistoryLimit"] = revision_history_limit + values["apiServer"]["revisionHistoryLimit"] = revision_history_limit if global_revision_history_limit: values["revisionHistoryLimit"] = global_revision_history_limit docs = render_chart( values=values, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) expected_result = revision_history_limit if revision_history_limit else global_revision_history_limit assert jmespath.search("spec.revisionHistoryLimit", docs[0]) == expected_result - @pytest.mark.parametrize( - "values", - [ - {"_rpcServer": {"enabled": True}}, - { - "_rpcServer": {"enabled": True}, - "config": {"core": {"internal_api_url": ""}}, - }, - ], - ) - def test_should_not_contain_host_header(self, values): - docs = render_chart(values=values, show_only=["templates/rpc-server/rpc-server-deployment.yaml"]) - - assert ( - jmespath.search("spec.template.spec.containers[0].livenessProbe.httpGet.httpHeaders", docs[0]) - is None - ) - assert ( - jmespath.search("spec.template.spec.containers[0].readinessProbe.httpGet.httpHeaders", docs[0]) - is None - ) - assert ( - jmespath.search("spec.template.spec.containers[0].startupProbe.httpGet.httpHeaders", docs[0]) - is None - ) - - def test_should_use_templated_base_url_for_probes(self): - docs = render_chart( - values={ - "_rpcServer": { - "enabled": True, - }, - "config": { - "core": { - "internal_api_url": "https://{{ .Release.Name }}.com:21222/mypath/{{ .Release.Name }}/path" - } - }, - }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], - ) - container = jmespath.search("spec.template.spec.containers[0]", docs[0]) - - assert {"name": "Host", "value": "release-name.com"} in jmespath.search( - "livenessProbe.httpGet.httpHeaders", container - ) - assert {"name": "Host", "value": "release-name.com"} in jmespath.search( - "readinessProbe.httpGet.httpHeaders", container - ) - assert {"name": "Host", "value": "release-name.com"} in jmespath.search( - "startupProbe.httpGet.httpHeaders", container - ) - assert ( - jmespath.search("livenessProbe.httpGet.path", container) - == "/mypath/release-name/path/internal_api/v1/health" - ) - assert ( - jmespath.search("readinessProbe.httpGet.path", container) - == "/mypath/release-name/path/internal_api/v1/health" - ) - assert ( - jmespath.search("startupProbe.httpGet.path", container) - == "/mypath/release-name/path/internal_api/v1/health" - ) - def test_should_add_scheme_to_liveness_and_readiness_and_startup_probes(self): docs = render_chart( values={ - "_rpcServer": { - "enabled": True, + "apiServer": { "livenessProbe": {"scheme": "HTTPS"}, "readinessProbe": {"scheme": "HTTPS"}, "startupProbe": {"scheme": "HTTPS"}, } }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert "HTTPS" in jmespath.search( @@ -193,14 +77,13 @@ def test_should_add_extra_containers(self): docs = render_chart( values={ "executor": "CeleryExecutor", - "_rpcServer": { - "enabled": True, + "apiServer": { "extraContainers": [ {"name": "{{.Chart.Name}}", "image": "test-registry/test-repo:test-tag"} ], }, }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert jmespath.search("spec.template.spec.containers[-1]", docs[0]) == { @@ -211,12 +94,11 @@ def test_should_add_extra_containers(self): def test_should_add_extraEnvs(self): docs = render_chart( values={ - "_rpcServer": { - "enabled": True, + "apiServer": { "env": [{"name": "TEST_ENV_1", "value": "test_env_1"}], }, }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert {"name": "TEST_ENV_1", "value": "test_env_1"} in jmespath.search( @@ -226,15 +108,14 @@ def test_should_add_extraEnvs(self): def test_should_add_extra_volume_and_extra_volume_mount(self): docs = render_chart( values={ - "_rpcServer": { - "enabled": True, + "apiServer": { "extraVolumes": [{"name": "test-volume-{{ .Chart.Name }}", "emptyDir": {}}], "extraVolumeMounts": [ {"name": "test-volume-{{ .Chart.Name }}", "mountPath": "/opt/test"} ], }, }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert jmespath.search("spec.template.spec.volumes[-1].name", docs[0]) == "test-volume-airflow" @@ -250,11 +131,10 @@ def test_should_add_extra_volume_and_extra_volume_mount(self): def test_should_add_global_volume_and_global_volume_mount(self): docs = render_chart( values={ - "_rpcServer": {"enabled": True}, "volumes": [{"name": "test-volume", "emptyDir": {}}], "volumeMounts": [{"name": "test-volume", "mountPath": "/opt/test"}], }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert jmespath.search("spec.template.spec.volumes[-1].name", docs[0]) == "test-volume" @@ -266,35 +146,23 @@ def test_should_add_global_volume_and_global_volume_mount(self): def test_should_add_extraEnvs_to_wait_for_migration_container(self): docs = render_chart( values={ - "_rpcServer": { - "enabled": True, + "apiServer": { "waitForMigrations": { "env": [{"name": "TEST_ENV_1", "value": "test_env_1"}], }, }, }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert {"name": "TEST_ENV_1", "value": "test_env_1"} in jmespath.search( "spec.template.spec.initContainers[0].env", docs[0] ) - @pytest.mark.parametrize( - "airflow_version, expected_arg", - [ - ("2.0.0", ["airflow", "db", "check-migrations", "--migration-wait-timeout=60"]), - ("2.1.0", ["airflow", "db", "check-migrations", "--migration-wait-timeout=60"]), - ("1.10.2", ["python", "-c"]), - ], - ) - def test_wait_for_migration_airflow_version(self, airflow_version, expected_arg): + def test_wait_for_migration_airflow_version(self): + expected_arg = ["airflow", "db", "check-migrations", "--migration-wait-timeout=60"] docs = render_chart( - values={ - "_rpcServer": {"enabled": True}, - "airflowVersion": airflow_version, - }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) # Don't test the full string, just the length of the expect matches actual = jmespath.search("spec.template.spec.initContainers[0].args", docs[0]) @@ -303,12 +171,11 @@ def test_wait_for_migration_airflow_version(self, airflow_version, expected_arg) def test_disable_wait_for_migration(self): docs = render_chart( values={ - "_rpcServer": { - "enabled": True, + "apiServer": { "waitForMigrations": {"enabled": False}, }, }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) actual = jmespath.search( "spec.template.spec.initContainers[?name=='wait-for-airflow-migrations']", docs[0] @@ -318,14 +185,13 @@ def test_disable_wait_for_migration(self): def test_should_add_extra_init_containers(self): docs = render_chart( values={ - "_rpcServer": { - "enabled": True, + "apiServer": { "extraInitContainers": [ {"name": "test-init-container", "image": "test-registry/test-repo:test-tag"} ], }, }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert jmespath.search("spec.template.spec.initContainers[-1]", docs[0]) == { @@ -336,12 +202,11 @@ def test_should_add_extra_init_containers(self): def test_should_add_component_specific_labels(self): docs = render_chart( values={ - "_rpcServer": { - "enabled": True, + "apiServer": { "labels": {"test_label": "test_label_value"}, }, }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert "test_label" in jmespath.search("spec.template.metadata.labels", docs[0]) @@ -350,8 +215,7 @@ def test_should_add_component_specific_labels(self): def test_should_create_valid_affinity_tolerations_and_node_selector(self): docs = render_chart( values={ - "_rpcServer": { - "enabled": True, + "apiServer": { "affinity": { "nodeAffinity": { "requiredDuringSchedulingIgnoredDuringExecution": { @@ -371,7 +235,7 @@ def test_should_create_valid_affinity_tolerations_and_node_selector(self): "nodeSelector": {"diskType": "ssd"}, } }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert jmespath.search("kind", docs[0]) == "Deployment" @@ -403,8 +267,7 @@ def test_should_create_valid_affinity_tolerations_and_node_selector(self): def test_should_create_default_affinity(self): docs = render_chart( - values={"_rpcServer": {"enabled": True}}, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert jmespath.search( @@ -412,10 +275,10 @@ def test_should_create_default_affinity(self): "preferredDuringSchedulingIgnoredDuringExecution[0]." "podAffinityTerm.labelSelector.matchLabels", docs[0], - ) == {"component": "rpc-server"} + ) == {"component": "api-server"} def test_affinity_tolerations_topology_spread_constraints_and_node_selector_precedence(self): - """When given both global and rpc-server affinity etc, rpc-server affinity etc is used.""" + """When given both global and api-server affinity etc, api-server affinity etc is used.""" expected_affinity = { "nodeAffinity": { "requiredDuringSchedulingIgnoredDuringExecution": { @@ -437,8 +300,7 @@ def test_affinity_tolerations_topology_spread_constraints_and_node_selector_prec } docs = render_chart( values={ - "_rpcServer": { - "enabled": True, + "apiServer": { "affinity": expected_affinity, "tolerations": [ {"key": "dynamic-pods", "operator": "Equal", "value": "true", "effect": "NoSchedule"} @@ -473,7 +335,7 @@ def test_affinity_tolerations_topology_spread_constraints_and_node_selector_prec ], "nodeSelector": {"type": "not-me"}, }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert expected_affinity == jmespath.search("spec.template.spec.affinity", docs[0]) @@ -493,8 +355,8 @@ def test_affinity_tolerations_topology_spread_constraints_and_node_selector_prec def test_scheduler_name(self): docs = render_chart( - values={"_rpcServer": {"enabled": True}, "schedulerName": "airflow-scheduler"}, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + values={"schedulerName": "airflow-scheduler"}, + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert ( @@ -515,8 +377,8 @@ def test_scheduler_name(self): ) def test_logs_persistence_adds_volume_and_mount(self, log_persistence_values, expected_claim_name): docs = render_chart( - values={"_rpcServer": {"enabled": True}, "logs": {"persistence": log_persistence_values}}, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + values={"logs": {"persistence": log_persistence_values}}, + show_only=["templates/api-server/api-server-deployment.yaml"], ) if expected_claim_name: @@ -536,8 +398,7 @@ def test_logs_persistence_adds_volume_and_mount(self, log_persistence_values, ex def test_config_volumes(self): docs = render_chart( - values={"_rpcServer": {"enabled": True}}, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) # default config @@ -548,18 +409,17 @@ def test_config_volumes(self): "subPath": "airflow.cfg", } in jmespath.search("spec.template.spec.containers[0].volumeMounts", docs[0]) - def test_rpc_server_resources_are_configurable(self): + def testapi_server_resources_are_configurable(self): docs = render_chart( values={ - "_rpcServer": { - "enabled": True, + "apiServer": { "resources": { "limits": {"cpu": "200m", "memory": "128Mi"}, "requests": {"cpu": "300m", "memory": "169Mi"}, }, }, }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert jmespath.search("spec.template.spec.containers[0].resources.limits.memory", docs[0]) == "128Mi" assert jmespath.search("spec.template.spec.containers[0].resources.limits.cpu", docs[0]) == "200m" @@ -584,11 +444,10 @@ def test_rpc_server_resources_are_configurable(self): jmespath.search("spec.template.spec.initContainers[0].resources.requests.cpu", docs[0]) == "300m" ) - def test_rpc_server_security_contexts_are_configurable(self): + def test_api_server_security_contexts_are_configurable(self): docs = render_chart( values={ - "_rpcServer": { - "enabled": True, + "apiServer": { "securityContexts": { "pod": { "fsGroup": 1000, @@ -603,7 +462,7 @@ def test_rpc_server_security_contexts_are_configurable(self): }, }, }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert jmespath.search("spec.template.spec.containers[0].securityContext", docs[0]) == { "allowPrivilegeEscalation": False, @@ -617,12 +476,11 @@ def test_rpc_server_security_contexts_are_configurable(self): "runAsNonRoot": True, } - def test_rpc_server_security_context_legacy(self): + def test_api_server_security_context_legacy(self): with pytest.raises(CalledProcessError, match="Additional property securityContext is not allowed"): render_chart( values={ - "_rpcServer": { - "enabled": True, + "apiServer": { "securityContext": { "fsGroup": 1000, "runAsGroup": 1001, @@ -631,61 +489,59 @@ def test_rpc_server_security_context_legacy(self): }, }, }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) - def test_rpc_server_resources_are_not_added_by_default(self): + def test_api_server_resources_are_not_added_by_default(self): docs = render_chart( - values={"_rpcServer": {"enabled": True}}, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert jmespath.search("spec.template.spec.containers[0].resources", docs[0]) == {} assert jmespath.search("spec.template.spec.initContainers[0].resources", docs[0]) == {} @pytest.mark.parametrize( - "airflow_version, expected_strategy", + ("airflow_version", "strategy", "expected_strategy"), [ - ("2.0.2", {"type": "RollingUpdate", "rollingUpdate": {"maxSurge": 1, "maxUnavailable": 0}}), - ("1.10.14", {"type": "Recreate"}), - ("1.9.0", {"type": "Recreate"}), - ("2.1.0", {"type": "RollingUpdate", "rollingUpdate": {"maxSurge": 1, "maxUnavailable": 0}}), + pytest.param( + "3.0.0", + None, + {"type": "RollingUpdate", "rollingUpdate": {"maxSurge": 1, "maxUnavailable": 0}}, + id="default", + ), + pytest.param( + "3.0.0", + {"type": "RollingUpdate", "rollingUpdate": {"maxUnavailable": 1}}, + {"type": "RollingUpdate", "rollingUpdate": {"maxSurge": 1, "maxUnavailable": 0}}, + id="custom-strategy", + ), ], ) - def test_default_update_strategy(self, airflow_version, expected_strategy): + def test_update_strategy(self, airflow_version, strategy, expected_strategy): docs = render_chart( - values={"_rpcServer": {"enabled": True}, "airflowVersion": airflow_version}, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], - ) - - assert jmespath.search("spec.strategy", docs[0]) == expected_strategy - - def test_update_strategy(self): - expected_strategy = {"type": "RollingUpdate", "rollingUpdate": {"maxUnavailable": 1}} - docs = render_chart( - values={"_rpcServer": {"enabled": True, "strategy": expected_strategy}}, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + values={"airflowVersion": airflow_version, "apiServer": {"strategy": expected_strategy}}, + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert jmespath.search("spec.strategy", docs[0]) == expected_strategy def test_default_command_and_args(self): docs = render_chart( - values={"_rpcServer": {"enabled": True}}, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) - assert jmespath.search("spec.template.spec.containers[0].command", docs[0]) == ["bash"] + assert jmespath.search("spec.template.spec.containers[0].command", docs[0]) is None assert jmespath.search("spec.template.spec.containers[0].args", docs[0]) == [ + "bash", "-c", - "exec airflow internal-api", + "exec airflow fastapi-api", ] @pytest.mark.parametrize("command", [None, ["custom", "command"]]) @pytest.mark.parametrize("args", [None, ["custom", "args"]]) def test_command_and_args_overrides(self, command, args): docs = render_chart( - values={"_rpcServer": {"enabled": True, "command": command, "args": args}}, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + values={"apiServer": {"command": command, "args": args}}, + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert command == jmespath.search("spec.template.spec.containers[0].command", docs[0]) @@ -694,13 +550,12 @@ def test_command_and_args_overrides(self, command, args): def test_command_and_args_overrides_are_templated(self): docs = render_chart( values={ - "_rpcServer": { - "enabled": True, + "apiServer": { "command": ["{{ .Release.Name }}"], "args": ["{{ .Release.Service }}"], } }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert jmespath.search("spec.template.spec.containers[0].command", docs[0]) == ["release-name"] @@ -708,57 +563,45 @@ def test_command_and_args_overrides_are_templated(self): def test_should_add_component_specific_annotations(self): docs = render_chart( - values={ - "_rpcServer": { - "enabled": True, - "annotations": {"test_annotation": "test_annotation_value"}, - }, - }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + values={"apiServer": {"annotations": {"test_annotation": "test_annotation_value"}}}, + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert "annotations" in jmespath.search("metadata", docs[0]) assert jmespath.search("metadata.annotations", docs[0])["test_annotation"] == "test_annotation_value" - def test_rpc_server_pod_hostaliases(self): + def test_api_server_pod_hostaliases(self): docs = render_chart( - values={ - "_rpcServer": { - "enabled": True, - "hostAliases": [{"ip": "127.0.0.1", "hostnames": ["foo.local"]}], - }, - }, - show_only=["templates/rpc-server/rpc-server-deployment.yaml"], + values={"apiServer": {"hostAliases": [{"ip": "127.0.0.1", "hostnames": ["foo.local"]}]}}, + show_only=["templates/api-server/api-server-deployment.yaml"], ) assert jmespath.search("spec.template.spec.hostAliases[0].ip", docs[0]) == "127.0.0.1" assert jmespath.search("spec.template.spec.hostAliases[0].hostnames[0]", docs[0]) == "foo.local" -class TestRPCServerService: - """Tests rpc-server service.""" +class TestAPIServerService: + """Tests api-server service.""" def test_default_service(self): docs = render_chart( - values={"_rpcServer": {"enabled": True}}, - show_only=["templates/rpc-server/rpc-server-service.yaml"], + show_only=["templates/api-server/api-server-service.yaml"], ) - assert jmespath.search("metadata.name", docs[0]) == "release-name-rpc-server" + assert jmespath.search("metadata.name", docs[0]) == "release-name-api-server" assert jmespath.search("metadata.annotations", docs[0]) is None assert jmespath.search("spec.selector", docs[0]) == { "tier": "airflow", - "component": "rpc-server", + "component": "api-server", "release": "release-name", } assert jmespath.search("spec.type", docs[0]) == "ClusterIP" - assert {"name": "rpc-server", "port": 9080} in jmespath.search("spec.ports", docs[0]) + assert {"name": "api-server", "port": 9091} in jmespath.search("spec.ports", docs[0]) def test_overrides(self): docs = render_chart( values={ - "ports": {"_rpcServer": 9000}, - "_rpcServer": { - "enabled": True, + "ports": {"apiServer": 9000}, + "apiServer": { "service": { "type": "LoadBalancer", "loadBalancerIP": "127.0.0.1", @@ -767,12 +610,12 @@ def test_overrides(self): }, }, }, - show_only=["templates/rpc-server/rpc-server-service.yaml"], + show_only=["templates/api-server/api-server-service.yaml"], ) assert jmespath.search("metadata.annotations", docs[0]) == {"foo": "bar"} assert jmespath.search("spec.type", docs[0]) == "LoadBalancer" - assert {"name": "rpc-server", "port": 9000} in jmespath.search("spec.ports", docs[0]) + assert {"name": "api-server", "port": 9000} in jmespath.search("spec.ports", docs[0]) assert jmespath.search("spec.loadBalancerIP", docs[0]) == "127.0.0.1" assert jmespath.search("spec.loadBalancerSourceRanges", docs[0]) == ["10.123.0.0/16"] @@ -785,19 +628,19 @@ def test_overrides(self): { "name": "{{ .Release.Name }}", "protocol": "UDP", - "port": "{{ .Values.ports._rpcServer }}", + "port": "{{ .Values.ports.apiServer }}", } ], - [{"name": "release-name", "protocol": "UDP", "port": 9080}], + [{"name": "release-name", "protocol": "UDP", "port": 9091}], ), ([{"name": "only_sidecar", "port": "{{ int 9000 }}"}], [{"name": "only_sidecar", "port": 9000}]), ( [ - {"name": "rpc-server", "port": "{{ .Values.ports._rpcServer }}"}, + {"name": "api-server", "port": "{{ .Values.ports.apiServer }}"}, {"name": "sidecar", "port": 80, "targetPort": "sidecar"}, ], [ - {"name": "rpc-server", "port": 9080}, + {"name": "api-server", "port": 9091}, {"name": "sidecar", "port": 80, "targetPort": "sidecar"}, ], ), @@ -805,23 +648,16 @@ def test_overrides(self): ) def test_ports_overrides(self, ports, expected_ports): docs = render_chart( - values={ - "_rpcServer": {"enabled": True, "service": {"ports": ports}}, - }, - show_only=["templates/rpc-server/rpc-server-service.yaml"], + values={"apiServer": {"service": {"ports": ports}}}, + show_only=["templates/api-server/api-server-service.yaml"], ) assert jmespath.search("spec.ports", docs[0]) == expected_ports def test_should_add_component_specific_labels(self): docs = render_chart( - values={ - "_rpcServer": { - "enabled": True, - "labels": {"test_label": "test_label_value"}, - }, - }, - show_only=["templates/rpc-server/rpc-server-service.yaml"], + values={"apiServer": {"labels": {"test_label": "test_label_value"}}}, + show_only=["templates/api-server/api-server-service.yaml"], ) assert "test_label" in jmespath.search("metadata.labels", docs[0]) assert jmespath.search("metadata.labels", docs[0])["test_label"] == "test_label_value" @@ -841,28 +677,20 @@ def test_should_add_component_specific_labels(self): ) def test_nodeport_service(self, ports, expected_ports): docs = render_chart( - values={ - "_rpcServer": { - "enabled": True, - "service": { - "type": "NodePort", - "ports": ports, - }, - }, - }, - show_only=["templates/rpc-server/rpc-server-service.yaml"], + values={"apiServer": {"service": {"type": "NodePort", "ports": ports}}}, + show_only=["templates/api-server/api-server-service.yaml"], ) assert jmespath.search("spec.type", docs[0]) == "NodePort" assert expected_ports == jmespath.search("spec.ports", docs[0]) -class TestRPCServerNetworkPolicy: - """Tests rpc-server network policy.""" +class TestAPIServerNetworkPolicy: + """Tests api-server network policy.""" def test_off_by_default(self): docs = render_chart( - show_only=["templates/rpc-server/rpc-server-networkpolicy.yaml"], + show_only=["templates/api-server/api-server-networkpolicy.yaml"], ) assert len(docs) == 0 @@ -870,8 +698,7 @@ def test_defaults(self): docs = render_chart( values={ "networkPolicies": {"enabled": True}, - "_rpcServer": { - "enabled": True, + "apiServer": { "networkPolicy": { "ingress": { "from": [{"namespaceSelector": {"matchLabels": {"release": "myrelease"}}}] @@ -879,7 +706,7 @@ def test_defaults(self): }, }, }, - show_only=["templates/rpc-server/rpc-server-networkpolicy.yaml"], + show_only=["templates/api-server/api-server-networkpolicy.yaml"], ) assert len(docs) == 1 @@ -887,7 +714,7 @@ def test_defaults(self): assert jmespath.search("spec.ingress[0].from", docs[0]) == [ {"namespaceSelector": {"matchLabels": {"release": "myrelease"}}} ] - assert jmespath.search("spec.ingress[0].ports", docs[0]) == [{"port": 9080}] + assert jmespath.search("spec.ingress[0].ports", docs[0]) == [{"port": 9091}] @pytest.mark.parametrize( "ports, expected_ports", @@ -895,11 +722,11 @@ def test_defaults(self): ([{"port": "sidecar"}], [{"port": "sidecar"}]), ( [ - {"port": "{{ .Values.ports._rpcServer }}"}, + {"port": "{{ .Values.ports.apiServer }}"}, {"port": 80}, ], [ - {"port": 9080}, + {"port": 9091}, {"port": 80}, ], ), @@ -909,8 +736,7 @@ def test_ports_overrides(self, ports, expected_ports): docs = render_chart( values={ "networkPolicies": {"enabled": True}, - "_rpcServer": { - "enabled": True, + "apiServer": { "networkPolicy": { "ingress": { "from": [{"namespaceSelector": {"matchLabels": {"release": "myrelease"}}}], @@ -919,7 +745,7 @@ def test_ports_overrides(self, ports, expected_ports): }, }, }, - show_only=["templates/rpc-server/rpc-server-networkpolicy.yaml"], + show_only=["templates/api-server/api-server-networkpolicy.yaml"], ) assert expected_ports == jmespath.search("spec.ingress[0].ports", docs[0]) @@ -928,30 +754,28 @@ def test_should_add_component_specific_labels(self): docs = render_chart( values={ "networkPolicies": {"enabled": True}, - "_rpcServer": { - "enabled": True, + "apiServer": { "labels": {"test_label": "test_label_value"}, }, }, - show_only=["templates/rpc-server/rpc-server-networkpolicy.yaml"], + show_only=["templates/api-server/api-server-networkpolicy.yaml"], ) assert "test_label" in jmespath.search("metadata.labels", docs[0]) assert jmespath.search("metadata.labels", docs[0])["test_label"] == "test_label_value" -class TestRPCServerServiceAccount: - """Tests rpc-server service account.""" +class TestAPIServerServiceAccount: + """Tests api-server service account.""" def test_should_add_component_specific_labels(self): docs = render_chart( values={ - "_rpcServer": { - "enabled": True, + "apiServer": { "serviceAccount": {"create": True}, "labels": {"test_label": "test_label_value"}, }, }, - show_only=["templates/rpc-server/rpc-server-serviceaccount.yaml"], + show_only=["templates/api-server/api-server-serviceaccount.yaml"], ) assert "test_label" in jmespath.search("metadata.labels", docs[0]) assert jmespath.search("metadata.labels", docs[0])["test_label"] == "test_label_value" @@ -959,23 +783,21 @@ def test_should_add_component_specific_labels(self): def test_default_automount_service_account_token(self): docs = render_chart( values={ - "_rpcServer": { - "enabled": True, + "apiServer": { "serviceAccount": {"create": True}, }, }, - show_only=["templates/rpc-server/rpc-server-serviceaccount.yaml"], + show_only=["templates/api-server/api-server-serviceaccount.yaml"], ) assert jmespath.search("automountServiceAccountToken", docs[0]) is True def test_overridden_automount_service_account_token(self): docs = render_chart( values={ - "_rpcServer": { - "enabled": True, + "apiServer": { "serviceAccount": {"create": True, "automountServiceAccountToken": False}, }, }, - show_only=["templates/rpc-server/rpc-server-serviceaccount.yaml"], + show_only=["templates/api-server/api-server-serviceaccount.yaml"], ) assert jmespath.search("automountServiceAccountToken", docs[0]) is False diff --git a/helm_tests/security/test_rbac.py b/helm_tests/security/test_rbac.py index 75924684cda98..d163a2ed68373 100644 --- a/helm_tests/security/test_rbac.py +++ b/helm_tests/security/test_rbac.py @@ -34,7 +34,6 @@ ("Service", "test-rbac-postgresql"), ("Service", "test-rbac-statsd"), ("Service", "test-rbac-webserver"), - ("Service", "test-rbac-rpc-server"), ("Service", "test-rbac-flower"), ("Service", "test-rbac-pgbouncer"), ("Service", "test-rbac-redis"), @@ -42,7 +41,6 @@ ("Deployment", "test-rbac-scheduler"), ("Deployment", "test-rbac-statsd"), ("Deployment", "test-rbac-webserver"), - ("Deployment", "test-rbac-rpc-server"), ("Deployment", "test-rbac-flower"), ("Deployment", "test-rbac-pgbouncer"), ("StatefulSet", "test-rbac-postgresql"), @@ -70,7 +68,6 @@ ("ServiceAccount", "test-rbac-cleanup"), ("ServiceAccount", "test-rbac-scheduler"), ("ServiceAccount", "test-rbac-webserver"), - ("ServiceAccount", "test-rbac-rpc-server"), ("ServiceAccount", "test-rbac-worker"), ("ServiceAccount", "test-rbac-triggerer"), ("ServiceAccount", "test-rbac-pgbouncer"), @@ -84,7 +81,7 @@ CUSTOM_SERVICE_ACCOUNT_NAMES = ( (CUSTOM_SCHEDULER_NAME := "TestScheduler"), (CUSTOM_WEBSERVER_NAME := "TestWebserver"), - (CUSTOM_RPC_SERVER_NAME := "TestRPCSserver"), + (CUSTOM_API_SERVER_NAME := "TestAPISserver"), (CUSTOM_WORKER_NAME := "TestWorker"), (CUSTOM_TRIGGERER_NAME := "TestTriggerer"), (CUSTOM_CLEANUP_NAME := "TestCleanup"), @@ -97,6 +94,8 @@ (CUSTOM_POSTGRESQL_NAME := "TestPostgresql"), ) +parametrize_version = pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "3.0.0", "default"]) + class TestRBAC: """Tests RBAC.""" @@ -107,18 +106,27 @@ def _get_values_with_version(self, values, version): return values @staticmethod - def _get_object_tuples(version): + def _get_object_tuples(version, sa: bool = True): tuples = copy(DEPLOYMENT_NO_RBAC_NO_SA_KIND_NAME_TUPLES) - if version == "default": + if version in {"default", "3.0.0"}: tuples.append(("Service", "test-rbac-triggerer")) tuples.append(("StatefulSet", "test-rbac-triggerer")) else: tuples.append(("Deployment", "test-rbac-triggerer")) if version == "2.3.2": tuples.append(("Secret", "test-rbac-result-backend")) + if version == "3.0.0": + tuples.extend( + ( + ("Service", "test-rbac-api-server"), + ("Deployment", "test-rbac-api-server"), + ) + ) + if sa: + tuples.append(("ServiceAccount", "test-rbac-api-server")) return tuples - @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "default"]) + @parametrize_version def test_deployments_no_rbac_no_sa(self, version): k8s_objects = render_chart( "test-rbac", @@ -141,7 +149,7 @@ def test_deployments_no_rbac_no_sa(self, version): "redis": {"serviceAccount": {"create": False}}, "scheduler": {"serviceAccount": {"create": False}}, "webserver": {"serviceAccount": {"create": False}}, - "_rpcServer": {"enabled": True, "serviceAccount": {"create": False}}, + "apiServer": {"serviceAccount": {"create": False}}, "workers": {"serviceAccount": {"create": False}}, "triggerer": {"serviceAccount": {"create": False}}, "statsd": {"serviceAccount": {"create": False}}, @@ -155,9 +163,9 @@ def test_deployments_no_rbac_no_sa(self, version): list_of_kind_names_tuples = [ (k8s_object["kind"], k8s_object["metadata"]["name"]) for k8s_object in k8s_objects ] - assert sorted(list_of_kind_names_tuples) == sorted(self._get_object_tuples(version)) + assert sorted(list_of_kind_names_tuples) == sorted(self._get_object_tuples(version, sa=False)) - @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "default"]) + @parametrize_version def test_deployments_no_rbac_with_sa(self, version): k8s_objects = render_chart( "test-rbac", @@ -168,7 +176,6 @@ def test_deployments_no_rbac_with_sa(self, version): "cleanup": {"enabled": True}, "flower": {"enabled": True}, "pgbouncer": {"enabled": True}, - "_rpcServer": {"enabled": True}, }, version=version, ), @@ -179,7 +186,7 @@ def test_deployments_no_rbac_with_sa(self, version): real_list_of_kind_names = self._get_object_tuples(version) + SERVICE_ACCOUNT_NAME_TUPLES assert sorted(list_of_kind_names_tuples) == sorted(real_list_of_kind_names) - @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "default"]) + @parametrize_version def test_deployments_with_rbac_no_sa(self, version): k8s_objects = render_chart( "test-rbac", @@ -194,7 +201,7 @@ def test_deployments_with_rbac_no_sa(self, version): }, "scheduler": {"serviceAccount": {"create": False}}, "webserver": {"serviceAccount": {"create": False}}, - "_rpcServer": {"enabled": True, "serviceAccount": {"create": False}}, + "apiServer": {"serviceAccount": {"create": False}}, "workers": {"serviceAccount": {"create": False}}, "triggerer": {"serviceAccount": {"create": False}}, "flower": {"enabled": True, "serviceAccount": {"create": False}}, @@ -215,10 +222,10 @@ def test_deployments_with_rbac_no_sa(self, version): list_of_kind_names_tuples = [ (k8s_object["kind"], k8s_object["metadata"]["name"]) for k8s_object in k8s_objects ] - real_list_of_kind_names = self._get_object_tuples(version) + RBAC_ENABLED_KIND_NAME_TUPLES + real_list_of_kind_names = self._get_object_tuples(version, sa=False) + RBAC_ENABLED_KIND_NAME_TUPLES assert sorted(list_of_kind_names_tuples) == sorted(real_list_of_kind_names) - @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "default"]) + @parametrize_version def test_deployments_with_rbac_with_sa(self, version): k8s_objects = render_chart( "test-rbac", @@ -228,7 +235,6 @@ def test_deployments_with_rbac_with_sa(self, version): "cleanup": {"enabled": True}, "flower": {"enabled": True}, "pgbouncer": {"enabled": True}, - "_rpcServer": {"enabled": True}, }, version=version, ), @@ -245,6 +251,7 @@ def test_service_account_custom_names(self): k8s_objects = render_chart( "test-rbac", values={ + "airflowVersion": "3.0.0", "fullnameOverride": "test-rbac", "cleanup": { "enabled": True, @@ -254,7 +261,7 @@ def test_service_account_custom_names(self): }, "scheduler": {"serviceAccount": {"name": CUSTOM_SCHEDULER_NAME}}, "webserver": {"serviceAccount": {"name": CUSTOM_WEBSERVER_NAME}}, - "_rpcServer": {"enabled": True, "serviceAccount": {"name": CUSTOM_RPC_SERVER_NAME}}, + "apiServer": {"serviceAccount": {"name": CUSTOM_API_SERVER_NAME}}, "workers": {"serviceAccount": {"name": CUSTOM_WORKER_NAME}}, "triggerer": {"serviceAccount": {"name": CUSTOM_TRIGGERER_NAME}}, "flower": {"enabled": True, "serviceAccount": {"name": CUSTOM_FLOWER_NAME}}, @@ -282,6 +289,7 @@ def test_service_account_custom_names_in_objects(self): k8s_objects = render_chart( "test-rbac", values={ + "airflowVersion": "3.0.0", "fullnameOverride": "test-rbac", "cleanup": { "enabled": True, @@ -291,7 +299,7 @@ def test_service_account_custom_names_in_objects(self): }, "scheduler": {"serviceAccount": {"name": CUSTOM_SCHEDULER_NAME}}, "webserver": {"serviceAccount": {"name": CUSTOM_WEBSERVER_NAME}}, - "_rpcServer": {"enabled": True, "serviceAccount": {"name": CUSTOM_RPC_SERVER_NAME}}, + "apiServer": {"serviceAccount": {"name": CUSTOM_API_SERVER_NAME}}, "workers": {"serviceAccount": {"name": CUSTOM_WORKER_NAME}}, "triggerer": {"serviceAccount": {"name": CUSTOM_TRIGGERER_NAME}}, "flower": {"enabled": True, "serviceAccount": {"name": CUSTOM_FLOWER_NAME}}, @@ -327,6 +335,7 @@ def test_service_account_without_resource(self): k8s_objects = render_chart( "test-rbac", values={ + "airflowVersion": "3.0.0", "fullnameOverride": "test-rbac", "executor": "LocalExecutor", "cleanup": {"enabled": False}, @@ -334,7 +343,6 @@ def test_service_account_without_resource(self): "redis": {"enabled": False}, "flower": {"enabled": False}, "statsd": {"enabled": False}, - "_rpcServer": {"enabled": False}, "webserver": {"defaultUser": {"enabled": False}}, }, ) @@ -346,6 +354,7 @@ def test_service_account_without_resource(self): service_account_names = [ "test-rbac-scheduler", "test-rbac-webserver", + "test-rbac-api-server", "test-rbac-triggerer", "test-rbac-migrate-database-job", ] diff --git a/kubernetes_tests/test_base.py b/kubernetes_tests/test_base.py index 98b92b40bb385..27a267b4486c3 100644 --- a/kubernetes_tests/test_base.py +++ b/kubernetes_tests/test_base.py @@ -183,6 +183,10 @@ def monitor_task(self, host, dag_run_id, dag_id, task_id, expected_final_state, if state == expected_final_state: break + if state in {"failed", "upstream_failed", "removed"}: + # If the TI is in failed state (and that's not the state we want) there's no point + # continuing to poll, it won't change + break self._describe_resources(namespace="airflow") self._describe_resources(namespace="default") tries += 1 @@ -215,6 +219,9 @@ def ensure_dag_expected_state(self, host, logical_date, dag_id, expected_final_s if state == expected_final_state: break + if state == "failed": + # If the DR is in failed state there's no point continuing to poll! + break self._describe_resources("airflow") self._describe_resources("default") tries += 1 diff --git a/kubernetes_tests/test_other_executors.py b/kubernetes_tests/test_other_executors.py index 638ca49ad445c..63fe802054802 100644 --- a/kubernetes_tests/test_other_executors.py +++ b/kubernetes_tests/test_other_executors.py @@ -54,6 +54,10 @@ def test_integration_run_dag(self): timeout=300, ) + @pytest.mark.xfail( + EXECUTOR == "LocalExecutor", + reason="https://github.com/apache/airflow/issues/44481 needs to be implemented", + ) def test_integration_run_dag_with_scheduler_failure(self): dag_id = "example_xcom"