From 64a297af075ac53c307aac1036fcc1c408ffcb15 Mon Sep 17 00:00:00 2001 From: Ulada Zakharava Date: Tue, 20 Feb 2024 09:19:28 +0000 Subject: [PATCH] Add GKEListJobsOperator and GKEDescribeJobOperator --- .../cncf/kubernetes/hooks/kubernetes.py | 25 ++ .../google/cloud/links/kubernetes_engine.py | 29 +++ .../cloud/operators/kubernetes_engine.py | 230 ++++++++++++++++++ airflow/providers/google/provider.yaml | 1 + .../operators/cloud/kubernetes_engine.rst | 31 +++ .../cloud/operators/test_kubernetes_engine.py | 102 ++++++++ .../example_kubernetes_engine_job.py | 19 ++ 7 files changed, 437 insertions(+) diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 64053b92a15c2..c950e3504468c 100644 --- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -37,6 +37,7 @@ from airflow.utils import yaml if TYPE_CHECKING: + from kubernetes.client import V1JobList from kubernetes.client.models import V1Deployment, V1Job, V1Pod LOADING_KUBE_CONFIG_FILE_RESOURCE = "Loading Kubernetes configuration file kube_config from {}..." @@ -502,6 +503,30 @@ def create_job( raise e return resp + def get_job(self, job_name: str, namespace: str) -> V1Job: + """Get Job of specified name from Google Cloud. + + :param job_name: Name of Job to fetch. + :param namespace: Namespace of the Job. + :return: Job object + """ + return self.batch_v1_client.read_namespaced_job(name=job_name, namespace=namespace, pretty=True) + + def list_jobs_all_namespaces(self) -> V1JobList: + """Get list of Jobs from all namespaces. + + :return: V1JobList object + """ + return self.batch_v1_client.list_job_for_all_namespaces(pretty=True) + + def list_jobs_from_namespace(self, namespace: str) -> V1JobList: + """Get list of Jobs from dedicated namespace. + + :param namespace: Namespace of the Job. + :return: V1JobList object + """ + return self.batch_v1_client.list_namespaced_job(namespace=namespace, pretty=True) + def _get_bool(val) -> bool | None: """Convert val to bool if can be done with certainty; if we cannot infer intention we return None.""" diff --git a/airflow/providers/google/cloud/links/kubernetes_engine.py b/airflow/providers/google/cloud/links/kubernetes_engine.py index ba59d02b55222..b7dd2840e4014 100644 --- a/airflow/providers/google/cloud/links/kubernetes_engine.py +++ b/airflow/providers/google/cloud/links/kubernetes_engine.py @@ -38,6 +38,11 @@ KUBERNETES_BASE_LINK + "/job/{location}/{cluster_name}/{namespace}/{job_name}/details?project={project_id}" ) +KUBERNETES_WORKLOADS_LINK = ( + KUBERNETES_BASE_LINK + + '/workload/overview?project={project_id}&pageState=("savedViews":' + '("c":%5B"gke%2F{location}%2F{cluster_name}"%5D,"n":%5B"{namespace}"%5D))' +) class KubernetesEngineClusterLink(BaseGoogleLink): @@ -111,3 +116,27 @@ def persist( "project_id": task_instance.project_id, }, ) + + +class KubernetesEngineWorkloadsLink(BaseGoogleLink): + """Helper class for constructing Kubernetes Engine Workloads Link.""" + + name = "Kubernetes Workloads" + key = "kubernetes_workloads_conf" + format_str = KUBERNETES_WORKLOADS_LINK + + @staticmethod + def persist( + context: Context, + task_instance, + ): + task_instance.xcom_push( + context=context, + key=KubernetesEngineWorkloadsLink.key, + value={ + "location": task_instance.location, + "cluster_name": task_instance.cluster_name, + "namespace": task_instance.namespace, + "project_id": task_instance.project_id, + }, + ) diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py b/airflow/providers/google/cloud/operators/kubernetes_engine.py index 204107c0235f2..6aeb03f85ecc5 100644 --- a/airflow/providers/google/cloud/operators/kubernetes_engine.py +++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py @@ -45,6 +45,7 @@ KubernetesEngineClusterLink, KubernetesEngineJobLink, KubernetesEnginePodLink, + KubernetesEngineWorkloadsLink, ) from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator from airflow.providers.google.cloud.triggers.kubernetes_engine import GKEOperationTrigger, GKEStartPodTrigger @@ -898,3 +899,232 @@ def execute(self, context: Context): ).fetch_cluster_info() return super().execute(context) + + +class GKEDescribeJobOperator(GoogleCloudBaseOperator): + """ + Retrieve information about Job by given name. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GKEDescribeJobOperator` + + :param job_name: The name of the resource to delete, in this case cluster name. + :param project_id: The Google Developers Console project id. + :param location: The name of the Google Kubernetes Engine zone or region in which the cluster + resides. + :param cluster_name: The name of the Google Kubernetes Engine cluster. + :param namespace: The name of the Google Kubernetes Engine namespace. + :param use_internal_ip: Use the internal IP address as the endpoint. + :param gcp_conn_id: The connection ID to use connecting to Google Cloud. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = ( + "project_id", + "gcp_conn_id", + "job_name", + "namespace", + "cluster_name", + "location", + "impersonation_chain", + ) + operator_extra_links = (KubernetesEngineJobLink(),) + + def __init__( + self, + *, + job_name: str, + location: str, + namespace: str, + cluster_name: str, + project_id: str | None = None, + use_internal_ip: bool = False, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + + self.project_id = project_id + self.gcp_conn_id = gcp_conn_id + self.location = location + self.job_name = job_name + self.namespace = namespace + self.cluster_name = cluster_name + self.use_internal_ip = use_internal_ip + self.impersonation_chain = impersonation_chain + + self.job: V1Job | None = None + + self._ssl_ca_cert: str | None = None + self._cluster_url: str | None = None + + if self.gcp_conn_id is None: + raise AirflowException( + "The gcp_conn_id parameter has become required. If you want to use Application Default " + "Credentials (ADC) strategy for authorization, create an empty connection " + "called `google_cloud_default`.", + ) + + @cached_property + def cluster_hook(self) -> GKEHook: + return GKEHook( + gcp_conn_id=self.gcp_conn_id, + location=self.location, + impersonation_chain=self.impersonation_chain, + ) + + @cached_property + def hook(self) -> GKEJobHook: + if self._cluster_url is None or self._ssl_ca_cert is None: + raise AttributeError( + "Cluster url and ssl_ca_cert should be defined before using self.hook method. " + "Try to use self.get_kube_creds method", + ) + + hook = GKEJobHook( + gcp_conn_id=self.gcp_conn_id, + cluster_url=self._cluster_url, + ssl_ca_cert=self._ssl_ca_cert, + ) + return hook + + def execute(self, context: Context) -> None: + self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails( + cluster_name=self.cluster_name, + project_id=self.project_id, + use_internal_ip=self.use_internal_ip, + cluster_hook=self.cluster_hook, + ).fetch_cluster_info() + + self.job = self.hook.get_job(job_name=self.job_name, namespace=self.namespace) + self.log.info( + "Retrieved description of Job %s from cluster %s:\n %s", + self.job_name, + self.cluster_name, + self.job, + ) + ti = context["ti"] + ti.xcom_push(key="job_name", value=self.job.metadata.name) + ti.xcom_push(key="job_namespace", value=self.job.metadata.namespace) + KubernetesEngineJobLink.persist(context=context, task_instance=self) + return None + + +class GKEListJobsOperator(GoogleCloudBaseOperator): + """ + Retrieve list of Jobs. + + If namespace parameter is specified, the list of Jobs from dedicated + namespace will be retrieved. If no namespace specified, it will output Jobs from all namespaces. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GKEListJobsOperator` + + :param project_id: The Google Developers Console project id. + :param location: The name of the Google Kubernetes Engine zone or region in which the cluster + resides. + :param cluster_name: The name of the Google Kubernetes Engine cluster. + :param namespace: The name of the Google Kubernetes Engine namespace. + :param use_internal_ip: Use the internal IP address as the endpoint. + :param gcp_conn_id: The connection ID to use connecting to Google Cloud. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = ( + "project_id", + "gcp_conn_id", + "namespace", + "cluster_name", + "location", + "impersonation_chain", + ) + operator_extra_links = (KubernetesEngineWorkloadsLink(),) + + def __init__( + self, + *, + location: str, + cluster_name: str, + namespace: str | None = None, + project_id: str | None = None, + use_internal_ip: bool = False, + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + + self.project_id = project_id + self.gcp_conn_id = gcp_conn_id + self.location = location + self.namespace = namespace + self.cluster_name = cluster_name + self.use_internal_ip = use_internal_ip + self.impersonation_chain = impersonation_chain + + self._ssl_ca_cert: str | None = None + self._cluster_url: str | None = None + + if self.gcp_conn_id is None: + raise AirflowException( + "The gcp_conn_id parameter has become required. If you want to use Application Default " + "Credentials (ADC) strategy for authorization, create an empty connection " + "called `google_cloud_default`.", + ) + + @cached_property + def cluster_hook(self) -> GKEHook: + return GKEHook( + gcp_conn_id=self.gcp_conn_id, + location=self.location, + impersonation_chain=self.impersonation_chain, + ) + + @cached_property + def hook(self) -> GKEJobHook: + if self._cluster_url is None or self._ssl_ca_cert is None: + raise AttributeError( + "Cluster url and ssl_ca_cert should be defined before using self.hook method. " + "Try to use self.get_kube_creds method", + ) + + hook = GKEJobHook( + gcp_conn_id=self.gcp_conn_id, + cluster_url=self._cluster_url, + ssl_ca_cert=self._ssl_ca_cert, + ) + return hook + + def execute(self, context: Context) -> None: + self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails( + cluster_name=self.cluster_name, + project_id=self.project_id, + use_internal_ip=self.use_internal_ip, + cluster_hook=self.cluster_hook, + ).fetch_cluster_info() + + if self.namespace: + jobs = self.hook.list_jobs_from_namespace(namespace=self.namespace) + else: + jobs = self.hook.list_jobs_all_namespaces() + for job in jobs.items: + self.log.info("Retrieved description of Job:\n %s", job) + KubernetesEngineWorkloadsLink.persist(context=context, task_instance=self) + return None diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index 7edc6e137c7c7..4a38adbe0bfe3 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -1210,6 +1210,7 @@ extra-links: - airflow.providers.google.cloud.links.kubernetes_engine.KubernetesEngineClusterLink - airflow.providers.google.cloud.links.kubernetes_engine.KubernetesEnginePodLink - airflow.providers.google.cloud.links.kubernetes_engine.KubernetesEngineJobLink + - airflow.providers.google.cloud.links.kubernetes_engine.KubernetesEngineWorkloadsLink - airflow.providers.google.cloud.links.pubsub.PubSubSubscriptionLink - airflow.providers.google.cloud.links.pubsub.PubSubTopicLink - airflow.providers.google.cloud.links.cloud_memorystore.MemcachedInstanceDetailsLink diff --git a/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst b/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst index 3b6e3cb316271..36d706173552a 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst @@ -213,6 +213,37 @@ All Kubernetes parameters (except ``config_file``) are also valid for the ``GKES :start-after: [START howto_operator_gke_start_job] :end-before: [END howto_operator_gke_start_job] + +.. _howto/operator:GKEDescribeJobOperator: + +Retrieve information about Job by given name +"""""""""""""""""""""""""""""""""""""""""""" + +You can use :class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKEDescribeJobOperator` to retrieve +detailed description of existing Job by providing its name and namespace. + +.. exampleinclude:: /../../tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_gke_describe_job] + :end-before: [END howto_operator_gke_describe_job] + + +.. _howto/operator:GKEListJobsOperator: + +Retrieve list of Jobs +""""""""""""""""""""" + +You can use :class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKEListJobsOperator` to retrieve +list of existing Jobs. If ``namespace`` parameter is provided, output will include Jobs across given namespace. +If ``namespace`` parameter is not specified, the information across all the namespaces will be outputted. + +.. exampleinclude:: /../../tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_gke_list_jobs] + :end-before: [END howto_operator_gke_list_jobs] + Reference ^^^^^^^^^ diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py b/tests/providers/google/cloud/operators/test_kubernetes_engine.py index f610803a3463f..0684f1f55a9bd 100644 --- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py @@ -34,6 +34,7 @@ from airflow.providers.google.cloud.operators.kubernetes_engine import ( GKECreateClusterOperator, GKEDeleteClusterOperator, + GKEDescribeJobOperator, GKEStartJobOperator, GKEStartKueueInsideClusterOperator, GKEStartPodOperator, @@ -58,6 +59,7 @@ ) TASK_NAME = "test-task-name" +JOB_NAME = "test-job" NAMESPACE = ("default",) IMAGE = "bash" @@ -69,6 +71,7 @@ GKE_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEHook" GKE_POD_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEPodHook" GKE_DEPLOYMENT_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEDeploymentHook" +GKE_JOB_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEJobHook" KUB_OPERATOR_EXEC = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.execute" KUB_JOB_OPERATOR_EXEC = "airflow.providers.cncf.kubernetes.operators.job.KubernetesJobOperator.execute" TEMP_FILE = "tempfile.NamedTemporaryFile" @@ -759,3 +762,102 @@ def test_gcp_conn_id(self, get_con_mock): hook = gke_op.hook assert hook.gcp_conn_id == "test_conn" + + +class TestGKEDescribeJobOperator: + def setup_method(self): + self.gke_op = GKEDescribeJobOperator( + project_id=TEST_GCP_PROJECT_ID, + location=PROJECT_LOCATION, + cluster_name=CLUSTER_NAME, + task_id=PROJECT_TASK_ID, + job_name=JOB_NAME, + namespace=NAMESPACE, + ) + self.gke_op.job = mock.MagicMock( + name=TASK_NAME, + namespace=NAMESPACE, + ) + + @mock.patch.dict(os.environ, {}) + @mock.patch(TEMP_FILE) + @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info") + @mock.patch(GKE_HOOK_PATH) + @mock.patch(GKE_JOB_HOOK_PATH) + def test_execute(self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock): + mock_job_hook.return_value.get_job.return_value = mock.MagicMock() + fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT) + self.gke_op.execute(context=mock.MagicMock()) + fetch_cluster_info_mock.assert_called_once() + + @mock.patch.dict(os.environ, {}) + @mock.patch( + "airflow.hooks.base.BaseHook.get_connections", + return_value=[Connection(extra=json.dumps({"keyfile_dict": '{"private_key": "r4nd0m_k3y"}'}))], + ) + @mock.patch(TEMP_FILE) + @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info") + @mock.patch(GKE_HOOK_PATH) + @mock.patch(GKE_JOB_HOOK_PATH) + def test_execute_with_impersonation_service_account( + self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock, get_con_mock + ): + mock_job_hook.return_value.get_job.return_value = mock.MagicMock() + fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT) + self.gke_op.impersonation_chain = "test_account@example.com" + self.gke_op.execute(context=mock.MagicMock()) + fetch_cluster_info_mock.assert_called_once() + + @mock.patch.dict(os.environ, {}) + @mock.patch( + "airflow.hooks.base.BaseHook.get_connections", + return_value=[Connection(extra=json.dumps({"keyfile_dict": '{"private_key": "r4nd0m_k3y"}'}))], + ) + @mock.patch(TEMP_FILE) + @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info") + @mock.patch(GKE_HOOK_PATH) + @mock.patch(GKE_JOB_HOOK_PATH) + def test_execute_with_impersonation_service_chain_one_element( + self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock, get_con_mock + ): + fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT) + self.gke_op.impersonation_chain = ["test_account@example.com"] + self.gke_op.execute(context=mock.MagicMock()) + + fetch_cluster_info_mock.assert_called_once() + + @pytest.mark.db_test + def test_default_gcp_conn_id(self): + gke_op = GKEDescribeJobOperator( + project_id=TEST_GCP_PROJECT_ID, + location=PROJECT_LOCATION, + cluster_name=CLUSTER_NAME, + task_id=PROJECT_TASK_ID, + job_name=TASK_NAME, + namespace=NAMESPACE, + ) + gke_op._cluster_url = CLUSTER_URL + gke_op._ssl_ca_cert = SSL_CA_CERT + hook = gke_op.hook + + assert hook.gcp_conn_id == "google_cloud_default" + + @mock.patch( + "airflow.providers.google.common.hooks.base_google.GoogleBaseHook.get_connection", + return_value=Connection(conn_id="test_conn"), + ) + def test_gcp_conn_id(self, get_con_mock): + gke_op = GKEDescribeJobOperator( + project_id=TEST_GCP_PROJECT_ID, + location=PROJECT_LOCATION, + cluster_name=CLUSTER_NAME, + task_id=PROJECT_TASK_ID, + job_name=TASK_NAME, + namespace=NAMESPACE, + gcp_conn_id="test_conn", + ) + gke_op._cluster_url = CLUSTER_URL + gke_op._ssl_ca_cert = SSL_CA_CERT + hook = gke_op.hook + + assert hook.gcp_conn_id == "test_conn" diff --git a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py index e1acd576e7fed..a118bda06ba60 100644 --- a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py +++ b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py @@ -27,6 +27,8 @@ from airflow.providers.google.cloud.operators.kubernetes_engine import ( GKECreateClusterOperator, GKEDeleteClusterOperator, + GKEDescribeJobOperator, + GKEListJobsOperator, GKEStartJobOperator, ) @@ -65,6 +67,23 @@ ) # [END howto_operator_gke_start_job] + # [START howto_operator_gke_list_jobs] + list_job_task = GKEListJobsOperator( + task_id="list_job_task", project_id=GCP_PROJECT_ID, location=GCP_LOCATION, cluster_name=CLUSTER_NAME + ) + # [END howto_operator_gke_list_jobs] + + # [START howto_operator_gke_describe_job] + describe_job_task = GKEDescribeJobOperator( + task_id="describe_job_task", + project_id=GCP_PROJECT_ID, + location=GCP_LOCATION, + job_name=job_task.output["job_name"], + namespace="default", + cluster_name=CLUSTER_NAME, + ) + # [END howto_operator_gke_describe_job] + delete_cluster = GKEDeleteClusterOperator( task_id="delete_cluster", name=CLUSTER_NAME,