From 86d42e1956279d49a333584da494fc4a0974d5a9 Mon Sep 17 00:00:00 2001 From: Ulada Zakharava Date: Mon, 12 Feb 2024 15:13:29 +0000 Subject: [PATCH] Add GKECreateCustomResource operator --- .../cncf/kubernetes/operators/resource.py | 18 ++- .../google/cloud/hooks/kubernetes_engine.py | 67 ++++++++++ .../cloud/operators/kubernetes_engine.py | 120 ++++++++++++++++++ 3 files changed, 199 insertions(+), 6 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/resource.py b/airflow/providers/cncf/kubernetes/operators/resource.py index 003460d8377ff..699be80a054e5 100644 --- a/airflow/providers/cncf/kubernetes/operators/resource.py +++ b/airflow/providers/cncf/kubernetes/operators/resource.py @@ -109,16 +109,22 @@ class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator): def create_custom_from_yaml_object(self, body: dict): group, version, namespace, plural = self.get_crd_fields(body) - self.custom_object_client.create_namespaced_custom_object(group, version, namespace, plural, body) + self.custom_object_client.create_namespaced_custom_object(group=group, version=version, + namespace=namespace, plural=plural, + body=body) def execute(self, context) -> None: resources = yaml.safe_load_all(self.yaml_conf) if not self.custom_resource_definition: - create_from_yaml( - k8s_client=self.client, - yaml_objects=resources, - namespace=self.get_namespace(), - ) + try: + create_from_yaml( + k8s_client=self.client, + yaml_objects=resources, + verbose=True, + namespace=self.get_namespace(), + ) + except Exception as exc: + self.log.info("Some error happened: %s", exc) else: k8s_resource_iterator(self.create_custom_from_yaml_object, resources) diff --git a/airflow/providers/google/cloud/hooks/kubernetes_engine.py b/airflow/providers/google/cloud/hooks/kubernetes_engine.py index 4fa97372f6a3f..27adb390cf05e 100644 --- a/airflow/providers/google/cloud/hooks/kubernetes_engine.py +++ b/airflow/providers/google/cloud/hooks/kubernetes_engine.py @@ -419,6 +419,72 @@ def check_kueue_deployment_running(self, name, namespace): raise AirflowException("Deployment timed out") +class GKECustomResourceHook(GoogleBaseHook, KubernetesHook): + """Google Kubernetes Engine Custom Resource APIs.""" + + def __init__( + self, + cluster_url: str, + ssl_ca_cert: str, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self._cluster_url = cluster_url + self._ssl_ca_cert = ssl_ca_cert + + @cached_property + def api_client(self) -> client.ApiClient: + self.log.info("in get conn") + return self.get_conn() + + @cached_property + def core_v1_client(self) -> client.CoreV1Api: + return client.CoreV1Api(self.api_client) + + @cached_property + def batch_v1_client(self) -> client.BatchV1Api: + return client.BatchV1Api(self.api_client) + + @cached_property + def apps_v1_client(self) -> client.AppsV1Api: + return client.AppsV1Api(api_client=self.api_client) + + @cached_property + def custom_object_client(self) -> client.CustomObjectsApi: + self.log.info("in custom hook obj") + return client.CustomObjectsApi(api_client=self.api_client) + + def get_conn(self) -> client.ApiClient: + configuration = self._get_config() + configuration.refresh_api_key_hook = self._refresh_api_key_hook + return client.ApiClient(configuration) + + def _refresh_api_key_hook(self, configuration: client.configuration.Configuration): + configuration.api_key = {"authorization": self._get_token(self.get_credentials())} + + def _get_config(self) -> client.configuration.Configuration: + configuration = client.Configuration( + host=self._cluster_url, + api_key_prefix={"authorization": "Bearer"}, + api_key={"authorization": self._get_token(self.get_credentials())}, + ) + configuration.ssl_ca_cert = FileOrData( + { + "certificate-authority-data": self._ssl_ca_cert, + }, + file_key_name="certificate-authority", + ).as_file() + return configuration + + @staticmethod + def _get_token(creds: google.auth.credentials.Credentials) -> str: + if creds.token is None or creds.expired: + auth_req = google_requests.Request() + creds.refresh(auth_req) + return creds.token + + class GKEAsyncHook(GoogleBaseAsyncHook): """Asynchronous client of GKE.""" @@ -523,6 +589,7 @@ def get_conn(self) -> client.ApiClient: if self.disable_tcp_keepalive is not True: _enable_tcp_keepalive() + self.log.info("in get_conn") return client.ApiClient(configuration) diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py b/airflow/providers/google/cloud/operators/kubernetes_engine.py index 204107c0235f2..ab022fa30c782 100644 --- a/airflow/providers/google/cloud/operators/kubernetes_engine.py +++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py @@ -29,13 +29,16 @@ from google.api_core.exceptions import AlreadyExists from google.cloud.container_v1.types import Cluster from kubernetes.utils.create_from_yaml import FailToCreateError +from kubernetes import client from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.operators.resource import KubernetesCreateResourceOperator from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction from airflow.providers.google.cloud.hooks.kubernetes_engine import ( + GKECustomResourceHook, GKEDeploymentHook, GKEHook, GKEJobHook, @@ -898,3 +901,120 @@ def execute(self, context: Context): ).fetch_cluster_info() return super().execute(context) + + +class GKECreateCustomResourceOperator(KubernetesCreateResourceOperator): + """ + Executes a task in a Kubernetes pod in the specified Google Kubernetes Engine cluster. + + This Operator assumes that the system has gcloud installed and has configured a + connection id with a service account. + + .. seealso:: + For more detail about Kubernetes Engine authentication have a look at the reference: + https://cloud.google.com/kubernetes-engine/docs/how-to/cluster-access-for-kubectl#internal_ip + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GKEStartPodOperator` + + :param location: The name of the Google Kubernetes Engine zone or region in which the + cluster resides, e.g. 'us-central1-a' + :param cluster_name: The name of the Google Kubernetes Engine cluster the pod + should be spawned in + :param use_internal_ip: Use the internal IP address as the endpoint. + :param project_id: The Google Developers Console project id + :param gcp_conn_id: The Google cloud connection id to use. This allows for + users to specify a service account. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + # template_fields: Sequence[str] = tuple( + # {"project_id", "location", "cluster_name"} | set(KubernetesPodOperator.template_fields) + # ) + # operator_extra_links = (KubernetesEnginePodLink(),) + + def __init__( + self, + *, + location: str, + cluster_name: str, + use_internal_ip: bool = False, + project_id: str | None = None, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.project_id = project_id + self.location = location + self.cluster_name = cluster_name + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + self.use_internal_ip = use_internal_ip + + self._ssl_ca_cert: str | None = None + self._cluster_url: str | None = None + + if self.gcp_conn_id is None: + raise AirflowException( + "The gcp_conn_id parameter has become required. If you want to use Application Default " + "Credentials (ADC) strategy for authorization, create an empty connection " + "called `google_cloud_default`.", + ) + # There is no need to manage the kube_config file, as it will be generated automatically. + # All Kubernetes parameters (except config_file) are also valid for the GKEStartPodOperator. + if self.config_file: + raise AirflowException("config_file is not an allowed parameter for the GKEStartPodOperator.") + + @staticmethod + @deprecated( + reason="Please use `fetch_cluster_info` instead to get the cluster info for connecting to it.", + category=AirflowProviderDeprecationWarning, + ) + def get_gke_config_file(): + pass + + @cached_property + def cluster_hook(self) -> GKEHook: + return GKEHook( + gcp_conn_id=self.gcp_conn_id, + location=self.location, + impersonation_chain=self.impersonation_chain, + ) + + @cached_property + def hook(self) -> GKECustomResourceHook: + return GKECustomResourceHook( + gcp_conn_id=self.gcp_conn_id, + cluster_url=self._cluster_url, + ssl_ca_cert=self._ssl_ca_cert, + impersonation_chain=self.impersonation_chain, + ) + + @cached_property + def client(self) -> client.ApiClient: + if self._cluster_url is None or self._ssl_ca_cert is None: + raise AttributeError( + "Cluster url and ssl_ca_cert should be defined before using self.hook method. " + "Try to use self.get_kube_creds method", + ) + return self.hook.api_client + + def execute(self, context: Context): + """Executes process of creating custom resource based on the YAML file passed.""" + """Executes process of creating Custom Resource.""" + self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails( + cluster_name=self.cluster_name, + project_id=self.project_id, + use_internal_ip=self.use_internal_ip, + cluster_hook=self.cluster_hook, + ).fetch_cluster_info() + return super().execute(context)