Skip to content

Commit

Permalink
feat: K8S resource operator - CRD (apache#35600)
Browse files Browse the repository at this point in the history
* feat: K8S resource operator - CRD

* clean

* tests

* remove sensor ( for another PR )

* clean

* test on k8s_resource_iterator
  • Loading branch information
raphaelauv authored Nov 22, 2023
1 parent 6e8f646 commit 8dc1b23
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 11 deletions.
62 changes: 51 additions & 11 deletions airflow/providers/cncf/kubernetes/operators/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_yaml
from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import k8s_resource_iterator

if TYPE_CHECKING:
from kubernetes.client import ApiClient
from kubernetes.client import ApiClient, CustomObjectsApi

__all__ = ["KubernetesCreateResourceOperator", "KubernetesDeleteResourceOperator"]

Expand All @@ -56,17 +57,23 @@ def __init__(
yaml_conf: str,
namespace: str | None = None,
kubernetes_conn_id: str | None = KubernetesHook.default_conn_name,
custom_resource_definition: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
self._namespace = namespace
self.kubernetes_conn_id = kubernetes_conn_id
self.yaml_conf = yaml_conf
self.custom_resource_definition = custom_resource_definition

@cached_property
def client(self) -> ApiClient:
return self.hook.api_client

@cached_property
def custom_object_client(self) -> CustomObjectsApi:
return self.hook.custom_object_client

@cached_property
def hook(self) -> KubernetesHook:
hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
Expand All @@ -78,24 +85,57 @@ def get_namespace(self) -> str:
else:
return self.hook.get_namespace() or "default"

def get_crd_fields(self, body: dict) -> tuple[str, str, str, str]:
api_version = body["apiVersion"]
group = api_version[0 : api_version.find("/")]
version = api_version[api_version.find("/") + 1 :]

namespace = None
if body.get("metadata"):
metadata: dict = body.get("metadata", None)
namespace = metadata.get("namespace", None)
if namespace is None:
namespace = self.get_namespace()

plural = body["kind"].lower() + "s"

return group, version, namespace, plural


class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator):
"""Create a resource in a kubernetes."""

def create_custom_from_yaml_object(self, body: dict):
group, version, namespace, plural = self.get_crd_fields(body)
self.custom_object_client.create_namespaced_custom_object(group, version, namespace, plural, body)

def execute(self, context) -> None:
create_from_yaml(
k8s_client=self.client,
yaml_objects=yaml.safe_load_all(self.yaml_conf),
namespace=self.get_namespace(),
)
resources = yaml.safe_load_all(self.yaml_conf)
if not self.custom_resource_definition:
create_from_yaml(
k8s_client=self.client,
yaml_objects=resources,
namespace=self.get_namespace(),
)
else:
k8s_resource_iterator(self.create_custom_from_yaml_object, resources)


class KubernetesDeleteResourceOperator(KubernetesResourceBaseOperator):
"""Delete a resource in a kubernetes."""

def delete_custom_from_yaml_object(self, body: dict):
name = body["metadata"]["name"]
group, version, namespace, plural = self.get_crd_fields(body)
self.custom_object_client.delete_namespaced_custom_object(group, version, namespace, plural, name)

def execute(self, context) -> None:
delete_from_yaml(
k8s_client=self.client,
yaml_objects=yaml.safe_load_all(self.yaml_conf),
namespace=self.get_namespace(),
)
resources = yaml.safe_load_all(self.yaml_conf)
if not self.custom_resource_definition:
delete_from_yaml(
k8s_client=self.client,
yaml_objects=resources,
namespace=self.get_namespace(),
)
else:
k8s_resource_iterator(self.delete_custom_from_yaml_object, resources)
46 changes: 46 additions & 0 deletions airflow/providers/cncf/kubernetes/utils/k8s_resource_iterator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Callable, Iterator

from kubernetes.utils import FailToCreateError

from airflow.providers.cncf.kubernetes.utils.delete_from import FailToDeleteError


def k8s_resource_iterator(callback: Callable[[dict], None], resources: Iterator) -> None:
failures: list = []
for data in resources:
if data is not None:
if "List" in data["kind"]:
kind = data["kind"].replace("List", "")
for yml_doc in data["items"]:
if kind != "":
yml_doc["apiVersion"] = data["apiVersion"]
yml_doc["kind"] = kind
try:
callback(yml_doc)
except (FailToCreateError, FailToDeleteError) as failure:
failures.extend(failure.api_exceptions)
else:
try:
callback(data)
except (FailToCreateError, FailToDeleteError) as failure:
failures.extend(failure.api_exceptions)
if failures:
raise FailToCreateError(failures)
63 changes: 63 additions & 0 deletions tests/providers/cncf/kubernetes/operators/test_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@
name: test_pvc_2
"""

TEST_VALID_CRD_YAML = """
apiVersion: ray.io/v1
kind: RayJob
metadata:
name: rayjob-sample
spec:
entrypoint: python /home/ray/program/job.py
shutdownAfterJobFinishes: true
"""

HOOK_CLASS = "airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook"


Expand Down Expand Up @@ -89,6 +99,19 @@ def test_create_application_from_yaml(self, mock_create_namespaced_persistent_vo
body=yaml.safe_load(TEST_VALID_RESOURCE_YAML), namespace="default"
)

@patch("kubernetes.client.api.CoreV1Api.create_namespaced_persistent_volume_claim")
def test_create_application_from_yaml_list(self, mock_create_namespaced_persistent_volume_claim, context):
op = KubernetesCreateResourceOperator(
yaml_conf=TEST_VALID_LIST_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
)

op.execute(context)

assert mock_create_namespaced_persistent_volume_claim.call_count == 2

@patch("kubernetes.client.api.CoreV1Api.delete_namespaced_persistent_volume_claim")
def test_single_delete_application_from_yaml(
self, mock_delete_namespaced_persistent_volume_claim, context
Expand Down Expand Up @@ -118,3 +141,43 @@ def test_multi_delete_application_from_yaml(
op.execute(context)

mock_delete_namespaced_persistent_volume_claim.assert_called()

@patch("kubernetes.client.api.CustomObjectsApi.create_namespaced_custom_object")
def test_create_custom_application_from_yaml(self, mock_create_namespaced_custom_object, context):
op = KubernetesCreateResourceOperator(
yaml_conf=TEST_VALID_CRD_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
custom_resource_definition=True,
)

op.execute(context)

mock_create_namespaced_custom_object.assert_called_once_with(
"ray.io",
"v1",
"default",
"rayjobs",
yaml.safe_load(TEST_VALID_CRD_YAML),
)

@patch("kubernetes.client.api.CustomObjectsApi.delete_namespaced_custom_object")
def test_delete_custom_application_from_yaml(self, mock_delete_namespaced_custom_object, context):
op = KubernetesDeleteResourceOperator(
yaml_conf=TEST_VALID_CRD_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
custom_resource_definition=True,
)

op.execute(context)

mock_delete_namespaced_custom_object.assert_called_once_with(
"ray.io",
"v1",
"default",
"rayjobs",
"rayjob-sample",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from collections import namedtuple

import pytest
import yaml
from kubernetes.utils import FailToCreateError

from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import k8s_resource_iterator

TEST_VALID_LIST_RESOURCE_YAML = """
apiVersion: v1
kind: List
items:
- apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: test_pvc_1
- apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: test_pvc_2
"""


def test_k8s_resource_iterator():
exception_k8s = namedtuple("Exception_k8s", "reason body")

def test_callback_failing(yml_doc: dict) -> None:
raise FailToCreateError(exception_k8s("the_reason", "the_body "))

with pytest.raises(FailToCreateError) as exc_info:
k8s_resource_iterator(
test_callback_failing, resources=yaml.safe_load_all(TEST_VALID_LIST_RESOURCE_YAML)
)

assert (
str(exc_info.value)
== "Error from server (the_reason): the_body Error from server (the_reason): the_body "
)

def callback_success(yml_doc: dict) -> None:
return

try:
k8s_resource_iterator(callback_success, resources=yaml.safe_load_all(TEST_VALID_LIST_RESOURCE_YAML))

except FailToCreateError:
assert False

0 comments on commit 8dc1b23

Please sign in to comment.