Skip to content

Commit

Permalink
Create DeleteKubernetesJobOperator and GKEDeleteJobOperator operators (
Browse files Browse the repository at this point in the history
  • Loading branch information
MaksYermak authored Mar 18, 2024
1 parent 884852a commit 29ac05f
Show file tree
Hide file tree
Showing 8 changed files with 389 additions and 9 deletions.
71 changes: 71 additions & 0 deletions airflow/providers/cncf/kubernetes/operators/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@

from kubernetes.client import BatchV1Api, models as k8s
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
add_unique_suffix,
Expand Down Expand Up @@ -306,3 +308,72 @@ def reconcile_job_specs(
return merge_objects(base_spec, client_spec)

return None


class KubernetesDeleteJobOperator(BaseOperator):
"""
Delete a Kubernetes Job.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:KubernetesDeleteJobOperator`
:param name: name of the Job.
:param namespace: the namespace to run within kubernetes.
:param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>`
for the Kubernetes cluster.
:param config_file: The path to the Kubernetes config file. (templated)
If not specified, default value is ``~/.kube/config``
:param in_cluster: run kubernetes client with in_cluster configuration.
:param cluster_context: context that points to kubernetes cluster.
Ignored when in_cluster is True. If None, current-context is used. (templated)
"""

template_fields: Sequence[str] = (
"config_file",
"namespace",
"cluster_context",
)

def __init__(
self,
*,
name: str,
namespace: str,
kubernetes_conn_id: str | None = KubernetesHook.default_conn_name,
config_file: str | None = None,
in_cluster: bool | None = None,
cluster_context: str | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.name = name
self.namespace = namespace
self.kubernetes_conn_id = kubernetes_conn_id
self.config_file = config_file
self.in_cluster = in_cluster
self.cluster_context = cluster_context

@cached_property
def hook(self) -> KubernetesHook:
return KubernetesHook(
conn_id=self.kubernetes_conn_id,
in_cluster=self.in_cluster,
config_file=self.config_file,
cluster_context=self.cluster_context,
)

@cached_property
def client(self) -> BatchV1Api:
return self.hook.batch_v1_client

def execute(self, context: Context):
try:
self.log.info("Deleting kubernetes Job: %s", self.name)
self.client.delete_namespaced_job(name=self.name, namespace=self.namespace)
self.log.info("Kubernetes job was deleted.")
except ApiException as e:
if e.status == 404:
self.log.info("The Kubernetes job %s does not exist.", self.name)
else:
raise e
109 changes: 108 additions & 1 deletion airflow/providers/google/cloud/operators/kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator
from airflow.providers.cncf.kubernetes.operators.job import KubernetesDeleteJobOperator, KubernetesJobOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.operators.resource import (
KubernetesCreateResourceOperator,
Expand Down Expand Up @@ -1346,3 +1346,110 @@ def __init__(
self.suspend = True
self.labels.update({"kueue.x-k8s.io/queue-name": queue_name})
self.annotations.update({"kueue.x-k8s.io/queue-name": queue_name})


class GKEDeleteJobOperator(KubernetesDeleteJobOperator):
"""
Delete a Kubernetes job in the specified Google Kubernetes Engine cluster.
This Operator assumes that the system has gcloud installed and has configured a
connection id with a service account.
The **minimum** required to define a cluster to create are the variables
``task_id``, ``project_id``, ``location``, ``cluster_name``, ``name``,
``namespace``
.. seealso::
For more detail about Kubernetes Engine authentication have a look at the reference:
https://cloud.google.com/kubernetes-engine/docs/how-to/cluster-access-for-kubectl#internal_ip
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GKEDeleteJobOperator`
:param location: The name of the Google Kubernetes Engine zone or region in which the
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster
:param use_internal_ip: Use the internal IP address as the endpoint.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""

template_fields: Sequence[str] = tuple(
{"project_id", "location", "cluster_name"} | set(KubernetesDeleteJobOperator.template_fields)
)

def __init__(
self,
*,
location: str,
cluster_name: str,
use_internal_ip: bool = False,
project_id: str | None = None,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.project_id = project_id
self.location = location
self.cluster_name = cluster_name
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self.use_internal_ip = use_internal_ip

self._ssl_ca_cert: str | None = None
self._cluster_url: str | None = None

if self.gcp_conn_id is None:
raise AirflowException(
"The gcp_conn_id parameter has become required. If you want to use Application Default "
"Credentials (ADC) strategy for authorization, create an empty connection "
"called `google_cloud_default`.",
)
# There is no need to manage the kube_config file, as it will be generated automatically.
# All Kubernetes parameters (except config_file) are also valid for the GKEDeleteJobOperator.
if self.config_file:
raise AirflowException("config_file is not an allowed parameter for the GKEDeleteJobOperator.")

@cached_property
def cluster_hook(self) -> GKEHook:
return GKEHook(
gcp_conn_id=self.gcp_conn_id,
location=self.location,
impersonation_chain=self.impersonation_chain,
)

@cached_property
def hook(self) -> GKEJobHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)

return GKEJobHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
)

def execute(self, context: Context):
"""Execute process of deleting Job."""
self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
cluster_name=self.cluster_name,
project_id=self.project_id,
use_internal_ip=self.use_internal_ip,
cluster_hook=self.cluster_hook,
).fetch_cluster_info()

return super().execute(context)
15 changes: 15 additions & 0 deletions docs/apache-airflow-providers-cncf-kubernetes/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -628,3 +628,18 @@ Instead of ``template`` parameter for Pod creating this operator uses :class:`~a
It means that user can use all parameters from :class:`~airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator` in :class:`~airflow.providers.cncf.kubernetes.operators.job.KubernetesJobOperator`.

More information about the Jobs here: `Kubernetes Job Documentation <https://kubernetes.io/docs/concepts/workloads/controllers/job/>`__


.. _howto/operator:KubernetesDeleteJobOperator:

KubernetesDeleteJobOperator
===========================

The :class:`~airflow.providers.cncf.kubernetes.operators.job.KubernetesDeleteJobOperator` allows
you to delete Jobs on a Kubernetes cluster.

.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes_job.py
:language: python
:dedent: 4
:start-after: [START howto_operator_delete_k8s_job]
:end-before: [END howto_operator_delete_k8s_job]
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,27 @@ For run Job on a GKE cluster with Kueue enabled use ``GKEStartKueueJobOperator``
:end-before: [END howto_operator_kueue_start_job]


.. _howto/operator:GKEDeleteJobOperator:

Delete a Job on a GKE cluster
"""""""""""""""""""""""""""""

There are two operators available in order to delete a job on a GKE cluster:

* :class:`~airflow.providers.cncf.kubernetes.operators.job.KubernetesDeleteJobOperator`
* :class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKEDeleteJobOperator`

``GKEDeleteJobOperator`` extends ``KubernetesDeleteJobOperator`` to provide authorization using Google Cloud credentials.
There is no need to manage the ``kube_config`` file, as it will be generated automatically.
All Kubernetes parameters (except ``config_file``) are also valid for the ``GKEDeleteJobOperator``.

.. exampleinclude:: /../../tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gke_delete_job]
:end-before: [END howto_operator_gke_delete_job]


.. _howto/operator:GKEDescribeJobOperator:

Retrieve information about Job by given name
Expand Down
28 changes: 27 additions & 1 deletion tests/providers/cncf/kubernetes/operators/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from airflow.exceptions import AirflowException
from airflow.models import DAG, DagModel, DagRun, TaskInstance
from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator
from airflow.providers.cncf.kubernetes.operators.job import KubernetesDeleteJobOperator, KubernetesJobOperator
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.types import DagRunType
Expand Down Expand Up @@ -519,3 +519,29 @@ def test_wait_until_job_complete(
namespace=mock_job_expected.metadata.namespace,
job_poll_interval=POLL_INTERVAL,
)


@pytest.mark.execution_timeout(300)
class TestKubernetesDeleteJobOperator:
@pytest.fixture(autouse=True)
def setup_tests(self):
self._default_client_patch = patch(f"{HOOK_CLASS}._get_default_client")
self._default_client_mock = self._default_client_patch.start()

yield

patch.stopall()

@patch("kubernetes.config.load_kube_config")
@patch("kubernetes.client.api.BatchV1Api.delete_namespaced_job")
def test_delete_execute(self, mock_delete_namespaced_job, mock_load_kube_config):
op = KubernetesDeleteJobOperator(
kubernetes_conn_id="kubernetes_default",
task_id="test_delete_job",
name="test_job_name",
namespace="test_job_namespace",
)

op.execute(None)

mock_delete_namespaced_job.assert_called()
Loading

0 comments on commit 29ac05f

Please sign in to comment.