From 8dc1b2311626146894b09e51f3693de376e5ad87 Mon Sep 17 00:00:00 2001 From: raphaelauv Date: Wed, 22 Nov 2023 11:23:46 +0100 Subject: [PATCH] feat: K8S resource operator - CRD (#35600) * feat: K8S resource operator - CRD * clean * tests * remove sensor ( for another PR ) * clean * test on k8s_resource_iterator --- .../cncf/kubernetes/operators/resource.py | 62 ++++++++++++++---- .../kubernetes/utils/k8s_resource_iterator.py | 46 +++++++++++++ .../kubernetes/operators/test_resource.py | 63 ++++++++++++++++++ .../utils/test_k8s_resource_iterator.py | 65 +++++++++++++++++++ 4 files changed, 225 insertions(+), 11 deletions(-) create mode 100644 airflow/providers/cncf/kubernetes/utils/k8s_resource_iterator.py create mode 100644 tests/providers/cncf/kubernetes/utils/test_k8s_resource_iterator.py diff --git a/airflow/providers/cncf/kubernetes/operators/resource.py b/airflow/providers/cncf/kubernetes/operators/resource.py index 598731b63939f..569b5861a6e2c 100644 --- a/airflow/providers/cncf/kubernetes/operators/resource.py +++ b/airflow/providers/cncf/kubernetes/operators/resource.py @@ -27,9 +27,10 @@ from airflow.models import BaseOperator from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_yaml +from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import k8s_resource_iterator if TYPE_CHECKING: - from kubernetes.client import ApiClient + from kubernetes.client import ApiClient, CustomObjectsApi __all__ = ["KubernetesCreateResourceOperator", "KubernetesDeleteResourceOperator"] @@ -56,17 +57,23 @@ def __init__( yaml_conf: str, namespace: str | None = None, kubernetes_conn_id: str | None = KubernetesHook.default_conn_name, + custom_resource_definition: bool = False, **kwargs, ) -> None: super().__init__(**kwargs) self._namespace = namespace self.kubernetes_conn_id = kubernetes_conn_id self.yaml_conf = yaml_conf + self.custom_resource_definition = custom_resource_definition @cached_property def client(self) -> ApiClient: return self.hook.api_client + @cached_property + def custom_object_client(self) -> CustomObjectsApi: + return self.hook.custom_object_client + @cached_property def hook(self) -> KubernetesHook: hook = KubernetesHook(conn_id=self.kubernetes_conn_id) @@ -78,24 +85,57 @@ def get_namespace(self) -> str: else: return self.hook.get_namespace() or "default" + def get_crd_fields(self, body: dict) -> tuple[str, str, str, str]: + api_version = body["apiVersion"] + group = api_version[0 : api_version.find("/")] + version = api_version[api_version.find("/") + 1 :] + + namespace = None + if body.get("metadata"): + metadata: dict = body.get("metadata", None) + namespace = metadata.get("namespace", None) + if namespace is None: + namespace = self.get_namespace() + + plural = body["kind"].lower() + "s" + + return group, version, namespace, plural + class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator): """Create a resource in a kubernetes.""" + def create_custom_from_yaml_object(self, body: dict): + group, version, namespace, plural = self.get_crd_fields(body) + self.custom_object_client.create_namespaced_custom_object(group, version, namespace, plural, body) + def execute(self, context) -> None: - create_from_yaml( - k8s_client=self.client, - yaml_objects=yaml.safe_load_all(self.yaml_conf), - namespace=self.get_namespace(), - ) + resources = yaml.safe_load_all(self.yaml_conf) + if not self.custom_resource_definition: + create_from_yaml( + k8s_client=self.client, + yaml_objects=resources, + namespace=self.get_namespace(), + ) + else: + k8s_resource_iterator(self.create_custom_from_yaml_object, resources) class KubernetesDeleteResourceOperator(KubernetesResourceBaseOperator): """Delete a resource in a kubernetes.""" + def delete_custom_from_yaml_object(self, body: dict): + name = body["metadata"]["name"] + group, version, namespace, plural = self.get_crd_fields(body) + self.custom_object_client.delete_namespaced_custom_object(group, version, namespace, plural, name) + def execute(self, context) -> None: - delete_from_yaml( - k8s_client=self.client, - yaml_objects=yaml.safe_load_all(self.yaml_conf), - namespace=self.get_namespace(), - ) + resources = yaml.safe_load_all(self.yaml_conf) + if not self.custom_resource_definition: + delete_from_yaml( + k8s_client=self.client, + yaml_objects=resources, + namespace=self.get_namespace(), + ) + else: + k8s_resource_iterator(self.delete_custom_from_yaml_object, resources) diff --git a/airflow/providers/cncf/kubernetes/utils/k8s_resource_iterator.py b/airflow/providers/cncf/kubernetes/utils/k8s_resource_iterator.py new file mode 100644 index 0000000000000..bfa1d05272ccf --- /dev/null +++ b/airflow/providers/cncf/kubernetes/utils/k8s_resource_iterator.py @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Callable, Iterator + +from kubernetes.utils import FailToCreateError + +from airflow.providers.cncf.kubernetes.utils.delete_from import FailToDeleteError + + +def k8s_resource_iterator(callback: Callable[[dict], None], resources: Iterator) -> None: + failures: list = [] + for data in resources: + if data is not None: + if "List" in data["kind"]: + kind = data["kind"].replace("List", "") + for yml_doc in data["items"]: + if kind != "": + yml_doc["apiVersion"] = data["apiVersion"] + yml_doc["kind"] = kind + try: + callback(yml_doc) + except (FailToCreateError, FailToDeleteError) as failure: + failures.extend(failure.api_exceptions) + else: + try: + callback(data) + except (FailToCreateError, FailToDeleteError) as failure: + failures.extend(failure.api_exceptions) + if failures: + raise FailToCreateError(failures) diff --git a/tests/providers/cncf/kubernetes/operators/test_resource.py b/tests/providers/cncf/kubernetes/operators/test_resource.py index a565e84fc1aa9..9673c6e0820ff 100644 --- a/tests/providers/cncf/kubernetes/operators/test_resource.py +++ b/tests/providers/cncf/kubernetes/operators/test_resource.py @@ -56,6 +56,16 @@ name: test_pvc_2 """ +TEST_VALID_CRD_YAML = """ +apiVersion: ray.io/v1 +kind: RayJob +metadata: + name: rayjob-sample +spec: + entrypoint: python /home/ray/program/job.py + shutdownAfterJobFinishes: true +""" + HOOK_CLASS = "airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook" @@ -89,6 +99,19 @@ def test_create_application_from_yaml(self, mock_create_namespaced_persistent_vo body=yaml.safe_load(TEST_VALID_RESOURCE_YAML), namespace="default" ) + @patch("kubernetes.client.api.CoreV1Api.create_namespaced_persistent_volume_claim") + def test_create_application_from_yaml_list(self, mock_create_namespaced_persistent_volume_claim, context): + op = KubernetesCreateResourceOperator( + yaml_conf=TEST_VALID_LIST_RESOURCE_YAML, + dag=self.dag, + kubernetes_conn_id="kubernetes_default", + task_id="test_task_id", + ) + + op.execute(context) + + assert mock_create_namespaced_persistent_volume_claim.call_count == 2 + @patch("kubernetes.client.api.CoreV1Api.delete_namespaced_persistent_volume_claim") def test_single_delete_application_from_yaml( self, mock_delete_namespaced_persistent_volume_claim, context @@ -118,3 +141,43 @@ def test_multi_delete_application_from_yaml( op.execute(context) mock_delete_namespaced_persistent_volume_claim.assert_called() + + @patch("kubernetes.client.api.CustomObjectsApi.create_namespaced_custom_object") + def test_create_custom_application_from_yaml(self, mock_create_namespaced_custom_object, context): + op = KubernetesCreateResourceOperator( + yaml_conf=TEST_VALID_CRD_YAML, + dag=self.dag, + kubernetes_conn_id="kubernetes_default", + task_id="test_task_id", + custom_resource_definition=True, + ) + + op.execute(context) + + mock_create_namespaced_custom_object.assert_called_once_with( + "ray.io", + "v1", + "default", + "rayjobs", + yaml.safe_load(TEST_VALID_CRD_YAML), + ) + + @patch("kubernetes.client.api.CustomObjectsApi.delete_namespaced_custom_object") + def test_delete_custom_application_from_yaml(self, mock_delete_namespaced_custom_object, context): + op = KubernetesDeleteResourceOperator( + yaml_conf=TEST_VALID_CRD_YAML, + dag=self.dag, + kubernetes_conn_id="kubernetes_default", + task_id="test_task_id", + custom_resource_definition=True, + ) + + op.execute(context) + + mock_delete_namespaced_custom_object.assert_called_once_with( + "ray.io", + "v1", + "default", + "rayjobs", + "rayjob-sample", + ) diff --git a/tests/providers/cncf/kubernetes/utils/test_k8s_resource_iterator.py b/tests/providers/cncf/kubernetes/utils/test_k8s_resource_iterator.py new file mode 100644 index 0000000000000..8a2e4f05396f4 --- /dev/null +++ b/tests/providers/cncf/kubernetes/utils/test_k8s_resource_iterator.py @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from collections import namedtuple + +import pytest +import yaml +from kubernetes.utils import FailToCreateError + +from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import k8s_resource_iterator + +TEST_VALID_LIST_RESOURCE_YAML = """ +apiVersion: v1 +kind: List +items: +- apiVersion: v1 + kind: PersistentVolumeClaim + metadata: + name: test_pvc_1 +- apiVersion: v1 + kind: PersistentVolumeClaim + metadata: + name: test_pvc_2 +""" + + +def test_k8s_resource_iterator(): + exception_k8s = namedtuple("Exception_k8s", "reason body") + + def test_callback_failing(yml_doc: dict) -> None: + raise FailToCreateError(exception_k8s("the_reason", "the_body ")) + + with pytest.raises(FailToCreateError) as exc_info: + k8s_resource_iterator( + test_callback_failing, resources=yaml.safe_load_all(TEST_VALID_LIST_RESOURCE_YAML) + ) + + assert ( + str(exc_info.value) + == "Error from server (the_reason): the_body Error from server (the_reason): the_body " + ) + + def callback_success(yml_doc: dict) -> None: + return + + try: + k8s_resource_iterator(callback_success, resources=yaml.safe_load_all(TEST_VALID_LIST_RESOURCE_YAML)) + + except FailToCreateError: + assert False