Skip to content

Commit

Permalink
Remove compat code for 2.7.0 - its now the min Airflow version (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
jedcunningham authored May 13, 2024
1 parent 3e229d8 commit e3897dc
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 80 deletions.
44 changes: 13 additions & 31 deletions airflow/providers/celery/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,37 +37,19 @@
from packaging.version import Version

from airflow import __version__ as airflow_version

try:
from airflow.cli.cli_config import (
ARG_DAEMON,
ARG_LOG_FILE,
ARG_PID,
ARG_SKIP_SERVE_LOGS,
ARG_STDERR,
ARG_STDOUT,
ARG_VERBOSE,
ActionCommand,
Arg,
GroupCommand,
lazy_load_command,
)
except ImportError:
import packaging.version

from airflow.exceptions import AirflowOptionalProviderFeatureException

base_version = packaging.version.parse(airflow_version).base_version

if packaging.version.parse(base_version) < packaging.version.parse("2.7.0"):
raise AirflowOptionalProviderFeatureException(
"Celery Executor from Celery Provider should only be used with Airflow 2.7.0+.\n"
f"This is Airflow {airflow_version} and Celery and CeleryKubernetesExecutor are "
f"available in the 'airflow.executors' package. You should not use "
f"the provider's executors in this version of Airflow."
)
raise

from airflow.cli.cli_config import (
ARG_DAEMON,
ARG_LOG_FILE,
ARG_PID,
ARG_SKIP_SERVE_LOGS,
ARG_STDERR,
ARG_STDOUT,
ARG_VERBOSE,
ActionCommand,
Arg,
GroupCommand,
lazy_load_command,
)
from airflow.configuration import conf
from airflow.exceptions import AirflowTaskTimeout
from airflow.executors.base_executor import BaseExecutor
Expand Down
46 changes: 14 additions & 32 deletions airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,38 +38,18 @@
from kubernetes.dynamic import DynamicClient
from sqlalchemy import select, update

from airflow.providers.cncf.kubernetes.pod_generator import PodMutationHookException, PodReconciliationError
from airflow.stats import Stats

try:
from airflow.cli.cli_config import (
ARG_DAG_ID,
ARG_EXECUTION_DATE,
ARG_OUTPUT_PATH,
ARG_SUBDIR,
ARG_VERBOSE,
ActionCommand,
Arg,
GroupCommand,
lazy_load_command,
positive_int,
)
except ImportError:
import packaging.version

from airflow import __version__ as airflow_version
from airflow.exceptions import AirflowOptionalProviderFeatureException

base_version = packaging.version.parse(airflow_version).base_version

if packaging.version.parse(base_version) < packaging.version.parse("2.7.0"):
raise AirflowOptionalProviderFeatureException(
"Kubernetes Executor from CNCF Provider should only be used with Airflow 2.7.0+.\n"
f"This is Airflow {airflow_version} and Kubernetes and CeleryKubernetesExecutor are "
f"available in the 'airflow.executors' package. You should not use "
f"the provider's executors in this version of Airflow."
)
raise
from airflow.cli.cli_config import (
ARG_DAG_ID,
ARG_EXECUTION_DATE,
ARG_OUTPUT_PATH,
ARG_SUBDIR,
ARG_VERBOSE,
ActionCommand,
Arg,
GroupCommand,
lazy_load_command,
positive_int,
)
from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
Expand All @@ -78,6 +58,8 @@
)
from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import annotations_to_key
from airflow.providers.cncf.kubernetes.pod_generator import PodMutationHookException, PodReconciliationError
from airflow.stats import Stats
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import remove_escape_codes
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import multiprocessing
import time
from queue import Empty, Queue
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from typing import TYPE_CHECKING, Any

from kubernetes import client, watch
from kubernetes.client.rest import ApiException
Expand All @@ -36,6 +36,7 @@
)
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.singleton import Singleton
from airflow.utils.state import TaskInstanceState

try:
Expand All @@ -60,22 +61,6 @@
KubernetesWatchType,
)

# Singleton here is duplicated version of airflow.utils.singleton.Singleton until
# min-airflow version is 2.7.0 for the provider. then it can be imported from airflow.utils.singleton.

T = TypeVar("T")


class Singleton(type, Generic[T]):
"""Metaclass that allows to implement singleton pattern."""

_instances: dict[Singleton[T], T] = {}

def __call__(cls: Singleton[T], *args, **kwargs) -> T:
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]


class ResourceVersion(metaclass=Singleton):
"""Singleton for tracking resourceVersion from Kubernetes."""
Expand Down

0 comments on commit e3897dc

Please sign in to comment.