Skip to content

Commit

Permalink
Add GKECreateCustomResource operator
Browse files Browse the repository at this point in the history
  • Loading branch information
VladaZakharova authored and MaksYermak committed Feb 16, 2024
1 parent c75a105 commit 86d42e1
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 6 deletions.
18 changes: 12 additions & 6 deletions airflow/providers/cncf/kubernetes/operators/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,22 @@ class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator):

def create_custom_from_yaml_object(self, body: dict):
group, version, namespace, plural = self.get_crd_fields(body)
self.custom_object_client.create_namespaced_custom_object(group, version, namespace, plural, body)
self.custom_object_client.create_namespaced_custom_object(group=group, version=version,
namespace=namespace, plural=plural,
body=body)

def execute(self, context) -> None:
resources = yaml.safe_load_all(self.yaml_conf)
if not self.custom_resource_definition:
create_from_yaml(
k8s_client=self.client,
yaml_objects=resources,
namespace=self.get_namespace(),
)
try:
create_from_yaml(
k8s_client=self.client,
yaml_objects=resources,
verbose=True,
namespace=self.get_namespace(),
)
except Exception as exc:
self.log.info("Some error happened: %s", exc)
else:
k8s_resource_iterator(self.create_custom_from_yaml_object, resources)

Expand Down
67 changes: 67 additions & 0 deletions airflow/providers/google/cloud/hooks/kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,72 @@ def check_kueue_deployment_running(self, name, namespace):
raise AirflowException("Deployment timed out")


class GKECustomResourceHook(GoogleBaseHook, KubernetesHook):
"""Google Kubernetes Engine Custom Resource APIs."""

def __init__(
self,
cluster_url: str,
ssl_ca_cert: str,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self._cluster_url = cluster_url
self._ssl_ca_cert = ssl_ca_cert

@cached_property
def api_client(self) -> client.ApiClient:
self.log.info("in get conn")
return self.get_conn()

@cached_property
def core_v1_client(self) -> client.CoreV1Api:
return client.CoreV1Api(self.api_client)

@cached_property
def batch_v1_client(self) -> client.BatchV1Api:
return client.BatchV1Api(self.api_client)

@cached_property
def apps_v1_client(self) -> client.AppsV1Api:
return client.AppsV1Api(api_client=self.api_client)

@cached_property
def custom_object_client(self) -> client.CustomObjectsApi:
self.log.info("in custom hook obj")
return client.CustomObjectsApi(api_client=self.api_client)

def get_conn(self) -> client.ApiClient:
configuration = self._get_config()
configuration.refresh_api_key_hook = self._refresh_api_key_hook
return client.ApiClient(configuration)

def _refresh_api_key_hook(self, configuration: client.configuration.Configuration):
configuration.api_key = {"authorization": self._get_token(self.get_credentials())}

def _get_config(self) -> client.configuration.Configuration:
configuration = client.Configuration(
host=self._cluster_url,
api_key_prefix={"authorization": "Bearer"},
api_key={"authorization": self._get_token(self.get_credentials())},
)
configuration.ssl_ca_cert = FileOrData(
{
"certificate-authority-data": self._ssl_ca_cert,
},
file_key_name="certificate-authority",
).as_file()
return configuration

@staticmethod
def _get_token(creds: google.auth.credentials.Credentials) -> str:
if creds.token is None or creds.expired:
auth_req = google_requests.Request()
creds.refresh(auth_req)
return creds.token


class GKEAsyncHook(GoogleBaseAsyncHook):
"""Asynchronous client of GKE."""

Expand Down Expand Up @@ -523,6 +589,7 @@ def get_conn(self) -> client.ApiClient:

if self.disable_tcp_keepalive is not True:
_enable_tcp_keepalive()
self.log.info("in get_conn")

return client.ApiClient(configuration)

Expand Down
120 changes: 120 additions & 0 deletions airflow/providers/google/cloud/operators/kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@
from google.api_core.exceptions import AlreadyExists
from google.cloud.container_v1.types import Cluster
from kubernetes.utils.create_from_yaml import FailToCreateError
from kubernetes import client

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.operators.resource import KubernetesCreateResourceOperator
from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
from airflow.providers.google.cloud.hooks.kubernetes_engine import (
GKECustomResourceHook,
GKEDeploymentHook,
GKEHook,
GKEJobHook,
Expand Down Expand Up @@ -898,3 +901,120 @@ def execute(self, context: Context):
).fetch_cluster_info()

return super().execute(context)


class GKECreateCustomResourceOperator(KubernetesCreateResourceOperator):
"""
Executes a task in a Kubernetes pod in the specified Google Kubernetes Engine cluster.
This Operator assumes that the system has gcloud installed and has configured a
connection id with a service account.
.. seealso::
For more detail about Kubernetes Engine authentication have a look at the reference:
https://cloud.google.com/kubernetes-engine/docs/how-to/cluster-access-for-kubectl#internal_ip
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GKEStartPodOperator`
:param location: The name of the Google Kubernetes Engine zone or region in which the
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster the pod
should be spawned in
:param use_internal_ip: Use the internal IP address as the endpoint.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""

# template_fields: Sequence[str] = tuple(
# {"project_id", "location", "cluster_name"} | set(KubernetesPodOperator.template_fields)
# )
# operator_extra_links = (KubernetesEnginePodLink(),)

def __init__(
self,
*,
location: str,
cluster_name: str,
use_internal_ip: bool = False,
project_id: str | None = None,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.project_id = project_id
self.location = location
self.cluster_name = cluster_name
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self.use_internal_ip = use_internal_ip

self._ssl_ca_cert: str | None = None
self._cluster_url: str | None = None

if self.gcp_conn_id is None:
raise AirflowException(
"The gcp_conn_id parameter has become required. If you want to use Application Default "
"Credentials (ADC) strategy for authorization, create an empty connection "
"called `google_cloud_default`.",
)
# There is no need to manage the kube_config file, as it will be generated automatically.
# All Kubernetes parameters (except config_file) are also valid for the GKEStartPodOperator.
if self.config_file:
raise AirflowException("config_file is not an allowed parameter for the GKEStartPodOperator.")

@staticmethod
@deprecated(
reason="Please use `fetch_cluster_info` instead to get the cluster info for connecting to it.",
category=AirflowProviderDeprecationWarning,
)
def get_gke_config_file():
pass

@cached_property
def cluster_hook(self) -> GKEHook:
return GKEHook(
gcp_conn_id=self.gcp_conn_id,
location=self.location,
impersonation_chain=self.impersonation_chain,
)

@cached_property
def hook(self) -> GKECustomResourceHook:
return GKECustomResourceHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
impersonation_chain=self.impersonation_chain,
)

@cached_property
def client(self) -> client.ApiClient:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)
return self.hook.api_client

def execute(self, context: Context):
"""Executes process of creating custom resource based on the YAML file passed."""
"""Executes process of creating Custom Resource."""
self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
cluster_name=self.cluster_name,
project_id=self.project_id,
use_internal_ip=self.use_internal_ip,
cluster_hook=self.cluster_hook,
).fetch_cluster_info()
return super().execute(context)

0 comments on commit 86d42e1

Please sign in to comment.