Skip to content

Commit

Permalink
Add GKEListJobsOperator and GKEDescribeJobOperator
Browse files Browse the repository at this point in the history
  • Loading branch information
VladaZakharova committed Feb 21, 2024
1 parent 011cd3d commit 64a297a
Show file tree
Hide file tree
Showing 7 changed files with 437 additions and 0 deletions.
25 changes: 25 additions & 0 deletions airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from airflow.utils import yaml

if TYPE_CHECKING:
from kubernetes.client import V1JobList
from kubernetes.client.models import V1Deployment, V1Job, V1Pod

LOADING_KUBE_CONFIG_FILE_RESOURCE = "Loading Kubernetes configuration file kube_config from {}..."
Expand Down Expand Up @@ -502,6 +503,30 @@ def create_job(
raise e
return resp

def get_job(self, job_name: str, namespace: str) -> V1Job:
"""Get Job of specified name from Google Cloud.
:param job_name: Name of Job to fetch.
:param namespace: Namespace of the Job.
:return: Job object
"""
return self.batch_v1_client.read_namespaced_job(name=job_name, namespace=namespace, pretty=True)

def list_jobs_all_namespaces(self) -> V1JobList:
"""Get list of Jobs from all namespaces.
:return: V1JobList object
"""
return self.batch_v1_client.list_job_for_all_namespaces(pretty=True)

def list_jobs_from_namespace(self, namespace: str) -> V1JobList:
"""Get list of Jobs from dedicated namespace.
:param namespace: Namespace of the Job.
:return: V1JobList object
"""
return self.batch_v1_client.list_namespaced_job(namespace=namespace, pretty=True)


def _get_bool(val) -> bool | None:
"""Convert val to bool if can be done with certainty; if we cannot infer intention we return None."""
Expand Down
29 changes: 29 additions & 0 deletions airflow/providers/google/cloud/links/kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
KUBERNETES_BASE_LINK
+ "/job/{location}/{cluster_name}/{namespace}/{job_name}/details?project={project_id}"
)
KUBERNETES_WORKLOADS_LINK = (
KUBERNETES_BASE_LINK
+ '/workload/overview?project={project_id}&pageState=("savedViews":'
'("c":%5B"gke%2F{location}%2F{cluster_name}"%5D,"n":%5B"{namespace}"%5D))'
)


class KubernetesEngineClusterLink(BaseGoogleLink):
Expand Down Expand Up @@ -111,3 +116,27 @@ def persist(
"project_id": task_instance.project_id,
},
)


class KubernetesEngineWorkloadsLink(BaseGoogleLink):
"""Helper class for constructing Kubernetes Engine Workloads Link."""

name = "Kubernetes Workloads"
key = "kubernetes_workloads_conf"
format_str = KUBERNETES_WORKLOADS_LINK

@staticmethod
def persist(
context: Context,
task_instance,
):
task_instance.xcom_push(
context=context,
key=KubernetesEngineWorkloadsLink.key,
value={
"location": task_instance.location,
"cluster_name": task_instance.cluster_name,
"namespace": task_instance.namespace,
"project_id": task_instance.project_id,
},
)
230 changes: 230 additions & 0 deletions airflow/providers/google/cloud/operators/kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
KubernetesEngineClusterLink,
KubernetesEngineJobLink,
KubernetesEnginePodLink,
KubernetesEngineWorkloadsLink,
)
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator
from airflow.providers.google.cloud.triggers.kubernetes_engine import GKEOperationTrigger, GKEStartPodTrigger
Expand Down Expand Up @@ -898,3 +899,232 @@ def execute(self, context: Context):
).fetch_cluster_info()

return super().execute(context)


class GKEDescribeJobOperator(GoogleCloudBaseOperator):
"""
Retrieve information about Job by given name.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GKEDescribeJobOperator`
:param job_name: The name of the resource to delete, in this case cluster name.
:param project_id: The Google Developers Console project id.
:param location: The name of the Google Kubernetes Engine zone or region in which the cluster
resides.
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param namespace: The name of the Google Kubernetes Engine namespace.
:param use_internal_ip: Use the internal IP address as the endpoint.
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""

template_fields: Sequence[str] = (
"project_id",
"gcp_conn_id",
"job_name",
"namespace",
"cluster_name",
"location",
"impersonation_chain",
)
operator_extra_links = (KubernetesEngineJobLink(),)

def __init__(
self,
*,
job_name: str,
location: str,
namespace: str,
cluster_name: str,
project_id: str | None = None,
use_internal_ip: bool = False,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)

self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.location = location
self.job_name = job_name
self.namespace = namespace
self.cluster_name = cluster_name
self.use_internal_ip = use_internal_ip
self.impersonation_chain = impersonation_chain

self.job: V1Job | None = None

self._ssl_ca_cert: str | None = None
self._cluster_url: str | None = None

if self.gcp_conn_id is None:
raise AirflowException(
"The gcp_conn_id parameter has become required. If you want to use Application Default "
"Credentials (ADC) strategy for authorization, create an empty connection "
"called `google_cloud_default`.",
)

@cached_property
def cluster_hook(self) -> GKEHook:
return GKEHook(
gcp_conn_id=self.gcp_conn_id,
location=self.location,
impersonation_chain=self.impersonation_chain,
)

@cached_property
def hook(self) -> GKEJobHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)

hook = GKEJobHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
)
return hook

def execute(self, context: Context) -> None:
self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
cluster_name=self.cluster_name,
project_id=self.project_id,
use_internal_ip=self.use_internal_ip,
cluster_hook=self.cluster_hook,
).fetch_cluster_info()

self.job = self.hook.get_job(job_name=self.job_name, namespace=self.namespace)
self.log.info(
"Retrieved description of Job %s from cluster %s:\n %s",
self.job_name,
self.cluster_name,
self.job,
)
ti = context["ti"]
ti.xcom_push(key="job_name", value=self.job.metadata.name)
ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace)
KubernetesEngineJobLink.persist(context=context, task_instance=self)
return None


class GKEListJobsOperator(GoogleCloudBaseOperator):
"""
Retrieve list of Jobs.
If namespace parameter is specified, the list of Jobs from dedicated
namespace will be retrieved. If no namespace specified, it will output Jobs from all namespaces.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GKEListJobsOperator`
:param project_id: The Google Developers Console project id.
:param location: The name of the Google Kubernetes Engine zone or region in which the cluster
resides.
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param namespace: The name of the Google Kubernetes Engine namespace.
:param use_internal_ip: Use the internal IP address as the endpoint.
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""

template_fields: Sequence[str] = (
"project_id",
"gcp_conn_id",
"namespace",
"cluster_name",
"location",
"impersonation_chain",
)
operator_extra_links = (KubernetesEngineWorkloadsLink(),)

def __init__(
self,
*,
location: str,
cluster_name: str,
namespace: str | None = None,
project_id: str | None = None,
use_internal_ip: bool = False,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)

self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.location = location
self.namespace = namespace
self.cluster_name = cluster_name
self.use_internal_ip = use_internal_ip
self.impersonation_chain = impersonation_chain

self._ssl_ca_cert: str | None = None
self._cluster_url: str | None = None

if self.gcp_conn_id is None:
raise AirflowException(
"The gcp_conn_id parameter has become required. If you want to use Application Default "
"Credentials (ADC) strategy for authorization, create an empty connection "
"called `google_cloud_default`.",
)

@cached_property
def cluster_hook(self) -> GKEHook:
return GKEHook(
gcp_conn_id=self.gcp_conn_id,
location=self.location,
impersonation_chain=self.impersonation_chain,
)

@cached_property
def hook(self) -> GKEJobHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)

hook = GKEJobHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
)
return hook

def execute(self, context: Context) -> None:
self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
cluster_name=self.cluster_name,
project_id=self.project_id,
use_internal_ip=self.use_internal_ip,
cluster_hook=self.cluster_hook,
).fetch_cluster_info()

if self.namespace:
jobs = self.hook.list_jobs_from_namespace(namespace=self.namespace)
else:
jobs = self.hook.list_jobs_all_namespaces()
for job in jobs.items:
self.log.info("Retrieved description of Job:\n %s", job)
KubernetesEngineWorkloadsLink.persist(context=context, task_instance=self)
return None
1 change: 1 addition & 0 deletions airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,7 @@ extra-links:
- airflow.providers.google.cloud.links.kubernetes_engine.KubernetesEngineClusterLink
- airflow.providers.google.cloud.links.kubernetes_engine.KubernetesEnginePodLink
- airflow.providers.google.cloud.links.kubernetes_engine.KubernetesEngineJobLink
- airflow.providers.google.cloud.links.kubernetes_engine.KubernetesEngineWorkloadsLink
- airflow.providers.google.cloud.links.pubsub.PubSubSubscriptionLink
- airflow.providers.google.cloud.links.pubsub.PubSubTopicLink
- airflow.providers.google.cloud.links.cloud_memorystore.MemcachedInstanceDetailsLink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,37 @@ All Kubernetes parameters (except ``config_file``) are also valid for the ``GKES
:start-after: [START howto_operator_gke_start_job]
:end-before: [END howto_operator_gke_start_job]


.. _howto/operator:GKEDescribeJobOperator:

Retrieve information about Job by given name
""""""""""""""""""""""""""""""""""""""""""""

You can use :class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKEDescribeJobOperator` to retrieve
detailed description of existing Job by providing its name and namespace.

.. exampleinclude:: /../../tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gke_describe_job]
:end-before: [END howto_operator_gke_describe_job]


.. _howto/operator:GKEListJobsOperator:

Retrieve list of Jobs
"""""""""""""""""""""

You can use :class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKEListJobsOperator` to retrieve
list of existing Jobs. If ``namespace`` parameter is provided, output will include Jobs across given namespace.
If ``namespace`` parameter is not specified, the information across all the namespaces will be outputted.

.. exampleinclude:: /../../tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gke_list_jobs]
:end-before: [END howto_operator_gke_list_jobs]

Reference
^^^^^^^^^

Expand Down
Loading

0 comments on commit 64a297a

Please sign in to comment.