diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 3cf0d14..94a25f7 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -2,6 +2,5 @@ - \ No newline at end of file diff --git a/build.py b/build.py index e0ccd34..724cd10 100644 --- a/build.py +++ b/build.py @@ -17,8 +17,8 @@ # import textwrap - from subprocess import check_call + from pybuilder.core import (use_plugin, init, Author, task) use_plugin("pypi:karellen_pyb_plugin", ">=0.0.1") @@ -57,7 +57,7 @@ def set_properties(project): project.depends_on("appdirs", "~=1.4") project.depends_on("requests", "~=2.25") project.depends_on("jsonpatch", "~=1.32") - project.depends_on("jsonpath-ng", "~=1.5") + project.depends_on("jsonpath-ng", "~=1.6.1") project.depends_on("jinja2", "~=3.1") project.depends_on("coloredlogs", "~=15.0") project.depends_on("jsonschema", "<4.0") @@ -81,12 +81,6 @@ def set_properties(project): project.set_property("distutils_setup_keywords", ["kubernetes", "k8s", "kube", "top", "provisioning", "kOps", "terraform", "tf", "AWS"]) - if False: - project.set_property("vendorize_target_dir", "$dir_source_main_python/kubernator/_vendor") - project.set_property("vendorize_packages", ["kubernetes~=28.0"]) - project.set_property("vendorize_cleanup_globs", []) - project.set_property("vendorize_preserve_metadata", []) - project.set_property("distutils_classifiers", [ "License :: OSI Approved :: Apache Software License", "Programming Language :: Python :: 3.9", @@ -112,7 +106,7 @@ def set_properties(project): # -*- coding: utf-8 -*- # # Copyright 2020 Express Systems USA, Inc - # Copyright 2021 Karellen, Inc. + # Copyright 2024 Karellen, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/src/integrationtest/python/issue_48/phase1/.kubernator.py b/src/integrationtest/python/issue_48/phase1/.kubernator.py new file mode 100644 index 0000000..a007c3f --- /dev/null +++ b/src/integrationtest/python/issue_48/phase1/.kubernator.py @@ -0,0 +1,23 @@ +# flake8: noqa +import os + +ktor.app.register_plugin("minikube", k8s_version=os.environ["K8S_VERSION"], + start_fresh=True, keep_running=True, profile="issue-48") +ktor.app.register_plugin("k8s") + +if False: + _old_req = ktor.k8s.client.rest_client.pool_manager.request + + + def request(method, url, fields=None, headers=None, **urlopen_kw): + resp = _old_req(method, url, fields=fields, headers=headers, **urlopen_kw) + logger.info("Send:\n%s %s\n\n%s\n\n%s", + method, url, "\n".join(map(lambda t: "%s: %s" % (t[0], t[1]), headers.items())), + urlopen_kw.get("body", "")) + logger.info("Recv:\n%s %s\n\n%s\n\n%s", + resp.status, resp.reason, "\n".join(map(lambda t: "%s: %s" % (t[0], t[1]), resp.headers.items())), + resp.data.decode("utf-8")) + return resp + + + ktor.k8s.client.rest_client.pool_manager.request = request diff --git a/src/integrationtest/python/issue_48/phase1/manifests.yaml b/src/integrationtest/python/issue_48/phase1/manifests.yaml new file mode 100644 index 0000000..d430ce3 --- /dev/null +++ b/src/integrationtest/python/issue_48/phase1/manifests.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: ns1 + annotations: + a: x +--- +apiVersion: v1 +kind: Namespace +metadata: + name: ns2 + annotations: + a: x diff --git a/src/integrationtest/python/issue_48/phase2/.kubernator.py b/src/integrationtest/python/issue_48/phase2/.kubernator.py new file mode 100644 index 0000000..2864189 --- /dev/null +++ b/src/integrationtest/python/issue_48/phase2/.kubernator.py @@ -0,0 +1,23 @@ +# flake8: noqa +import os + +ktor.app.register_plugin("minikube", k8s_version=os.environ["K8S_VERSION"], + start_fresh=False, keep_running=False, profile="issue-48") +ktor.app.register_plugin("k8s") + +if False: + _old_req = ktor.k8s.client.rest_client.pool_manager.request + + + def request(method, url, fields=None, headers=None, **urlopen_kw): + resp = _old_req(method, url, fields=fields, headers=headers, **urlopen_kw) + logger.info("Send:\n%s %s\n\n%s\n\n%s", + method, url, "\n".join(map(lambda t: "%s: %s" % (t[0], t[1]), headers.items())), + urlopen_kw.get("body", "")) + logger.info("Recv:\n%s %s\n\n%s\n\n%s", + resp.status, resp.reason, "\n".join(map(lambda t: "%s: %s" % (t[0], t[1]), resp.headers.items())), + resp.data.decode("utf-8")) + return resp + + + ktor.k8s.client.rest_client.pool_manager.request = request diff --git a/src/integrationtest/python/issue_48/phase2/manifests.yaml b/src/integrationtest/python/issue_48/phase2/manifests.yaml new file mode 100644 index 0000000..c6e3c63 --- /dev/null +++ b/src/integrationtest/python/issue_48/phase2/manifests.yaml @@ -0,0 +1,16 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: ns1 + annotations: + $patch: replace + b: y +--- +apiVersion: v1 +kind: Namespace +metadata: + name: ns2 + annotations: + $patch: delete + b: y + diff --git a/src/integrationtest/python/issue_48_tests.py b/src/integrationtest/python/issue_48_tests.py new file mode 100644 index 0000000..ef0561d --- /dev/null +++ b/src/integrationtest/python/issue_48_tests.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2020 Express Systems USA, Inc +# Copyright 2023 Karellen, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from test_support import IntegrationTestSupport, unittest + +unittest # noqa +# Above import must be first + +from pathlib import Path # noqa: E402 +import os # noqa: E402 +import tempfile # noqa: E402 +import yaml # noqa: E402 +from pprint import pprint # noqa: E402 + + +class Issue48Test(IntegrationTestSupport): + def test_issue_48(self): + test_dir = Path(__file__).parent / "issue_48" + test_dir_1 = test_dir / "phase1" + test_dir_2 = test_dir / "phase2" + with tempfile.TemporaryDirectory() as results_dir: + results_file = Path(results_dir) / "results" + for k8s_version in (self.K8S_TEST_VERSIONS[-1],): + with self.subTest(k8s_version=k8s_version): + os.environ["K8S_VERSION"] = k8s_version + + self.run_module_test("kubernator", "-p", str(test_dir_1), "-v", "TRACE", "apply", "--yes") + self.run_module_test("kubernator", "-p", str(test_dir_2), + "-v", "TRACE", "-f", str(results_file), "dump") + + with open(results_file, "rb") as f: + results = list(yaml.safe_load_all(f)) + pprint(results) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/main/python/kubernator/_json_path.py b/src/main/python/kubernator/_json_path.py new file mode 100644 index 0000000..22e586c --- /dev/null +++ b/src/main/python/kubernator/_json_path.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2020 Express Systems USA, Inc +# Copyright 2024 Karellen, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import re +from functools import cache + +from jsonpath_ng import JSONPath, DatumInContext +from jsonpath_ng.ext import parse as jp_parse, parser +from jsonpath_ng.ext.string import DefintionInvalid + +__all__ = ["jp", "JPath"] + + +class JPath: + def __init__(self, pattern): + self.pattern = jp_parse(pattern) + + def find(self, val): + return self.pattern.find(val) + + def all(self, val): + return list(map(lambda x: x.value, self.find(val))) + + def first(self, val): + """Returns the first element or None if it doesn't exist""" + try: + return next(map(lambda x: x.value, self.find(val))) + except StopIteration: + return None + + def only(self, val): + """Returns the first and only element. + Raises ValueError if more than one value found + Raises KeyError if no value found + """ + m = map(lambda x: x.value, self.find(val)) + try: + v = next(m) + except StopIteration: + raise KeyError("no value found") + try: + next(m) + raise ValueError("more than one value returned") + except StopIteration: + return v + + +@cache +def jp(pattern) -> JPath: + return JPath(pattern) + + +MATCH = re.compile(r"match\(/(.*)(? JPath: - return JPath(pattern) - - def scan_dir(logger, path: Path, path_filter: Callable[[os.DirEntry], bool], excludes, includes): logger.debug("Scanning %s, excluding %s, including %s", path, excludes, includes) with os.scandir(path) as it: # type: Iterable[os.DirEntry] diff --git a/src/main/python/kubernator/merge.py b/src/main/python/kubernator/merge.py new file mode 100644 index 0000000..6198754 --- /dev/null +++ b/src/main/python/kubernator/merge.py @@ -0,0 +1,128 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2020 Express Systems USA, Inc +# Copyright 2024 Karellen, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from copy import deepcopy +from kubernator.api import jp +from jsonpath_ng.jsonpath import Index, Child + + +# https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/strategic-merge-patch.md + +def extract_merge_instructions(manifest, resource): + normalized_manifest = deepcopy(manifest) + change_instrs = jp('$..`match(/\\\\$patch|\\\\$deleteFromPrimitiveList\\/.*/)`').find(manifest) + + instructions = [] + for change_instr in change_instrs: + field = change_instr.path.fields[0] + context = change_instr.context.value + + list_of_maps = False + search_key = None + change_path = change_instr.full_path.left + if isinstance(change_path, Child) and isinstance(change_path.right, Index): + list_of_maps = True + index = change_path.right.index + change_path = change_path.left + clean_manifest_result: list = change_path.find(normalized_manifest)[0].value + search_key = context.copy() + del search_key[field] + del clean_manifest_result[index] + else: + clean_manifest_result = change_path.find(normalized_manifest)[0] + clean_manifest_result_context = clean_manifest_result.value + del clean_manifest_result_context[field] + + instruction_value = context[field] + if field == "$patch": + if instruction_value in ("replace", "delete"): + instructions.append(("patch", instruction_value, + change_path, list_of_maps, search_key)) + else: + raise ValueError("Invalid $patch instruction %r in resource %s at %s" % + (instruction_value, + resource, + change_path)) + elif field.startswith("$deleteFromPrimitiveList/"): + instructions.append(("delete-from-list", instruction_value, field[25:], + change_path)) + + return instructions, normalized_manifest + + +def apply_merge_instructions(merge_instrs, source_manifest, target_manifest, logger, resource): + for merge_instr in merge_instrs: + if merge_instr[0] == "patch": + op, op_type, change_path, list_of_maps, search_key = merge_instr + else: + op, delete_list, field_name, change_path = merge_instr + + if op == "patch": + source_obj = change_path.find(source_manifest)[0].value + merged_obj = change_path.find(target_manifest)[0].value + if op_type == "delete": + if list_of_maps: + logger.trace("Deleting locally in resource %s: %s from %s at %s", + resource, + search_key, + merged_obj, + change_path) + del_idxs = [] + for idx, obj in enumerate(merged_obj): + for k, v in search_key.items(): + if k in obj: + if v is not None and obj[k] == v: + del_idxs.append(idx) + + for idx in del_idxs: + del merged_obj[idx] + else: + logger.trace("Deleting locally in resource %s: %s at %s", + resource, + merged_obj, + change_path) + merged_datum = change_path.find(target_manifest)[0] + merged_datum.context.value[merged_datum.path.fields[0]] = None + elif op_type == "replace": + logger.trace("Replacing locally in resource %s: %s with %s at %s", + resource, + merged_obj, source_obj, + change_path) + merged_obj.clear() + if list_of_maps: + merged_obj.extend(source_obj) + else: + merged_obj.update(source_obj) + else: + raise ValueError("Invalid $patch instruction %s found at %s in resource %s", + op_type, change_path, resource) + + elif op == "delete-from-list": + merged_list: list = change_path.find(target_manifest)[0].value[field_name] + if not isinstance(merged_list, list): + raise ValueError("Not a list in resource %s: %s in %r at %s" % + (resource, merged_list, field_name, change_path)) + logger.trace("Deleting from list locally in resource %s: %s from %s in %r at %s", + resource, delete_list, merged_list, field_name, change_path) + for v in delete_list: + try: + merged_list.remove(v) + except ValueError: + logger.warning("No value %s to delete from list %s in %r at %s in resource %s", + v, merged_list, field_name, change_path, resource) + else: + raise RuntimeError("should never reach here") diff --git a/src/main/python/kubernator/plugins/k8s.py b/src/main/python/kubernator/plugins/k8s.py index bfe0446..a817cac 100644 --- a/src/main/python/kubernator/plugins/k8s.py +++ b/src/main/python/kubernator/plugins/k8s.py @@ -39,6 +39,7 @@ load_remote_file, StripNL, install_python_k8s_client) +from kubernator.merge import extract_merge_instructions, apply_merge_instructions from kubernator.plugins.k8s_api import (K8SResourcePluginMixin, K8SResource, K8SResourcePatchType, @@ -406,6 +407,13 @@ def create(exists_ok=False): return raise + merge_instrs, normalized_manifest = extract_merge_instructions(resource.manifest, resource) + if merge_instrs: + logger.trace("Normalized manifest (no merge instructions) for resource %s: %s", resource, + normalized_manifest) + else: + normalized_manifest = resource.manifest + logger.debug("Applying resource %s%s", resource, status_msg) try: remote_resource = resource.get() @@ -421,9 +429,9 @@ def create(exists_ok=False): else: raise else: - logger.trace("Attempting to retrieve a normalized patch for resource %s: %s", resource, resource.manifest) + logger.trace("Attempting to retrieve a normalized patch for resource %s: %s", resource, normalized_manifest) try: - merged_resource = resource.patch(resource.manifest, + merged_resource = resource.patch(normalized_manifest, patch_type=K8SResourcePatchType.SERVER_SIDE_PATCH, dry_run=True, force=True) @@ -458,6 +466,9 @@ def create(exists_ok=False): raise else: logger.trace("Merged resource %s: %s", resource, merged_resource) + if merge_instrs: + apply_merge_instructions(merge_instrs, normalized_manifest, merged_resource, logger, resource) + patch = jsonpatch.make_patch(remote_resource, merged_resource) logger.trace("Resource %s initial patches are: %s", resource, patch) patch = self._filter_resource_patch(patch, patch_field_excludes) diff --git a/src/unittest/python/merge_tests.py b/src/unittest/python/merge_tests.py new file mode 100644 index 0000000..d404edf --- /dev/null +++ b/src/unittest/python/merge_tests.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2020 Express Systems USA, Inc +# Copyright 2021 Karellen, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from gevent.monkey import patch_all, is_anything_patched + +if not is_anything_patched(): + patch_all() + +import logging +import unittest +from kubernator.merge import extract_merge_instructions, apply_merge_instructions + +TRACE = 5 + + +def trace(self, msg, *args, **kwargs): + """ + Log 'msg % args' with severity 'TRACE'. + + To pass exception information, use the keyword argument exc_info with + a true value, e.g. + + logger.trace("Houston, we have a %s", "interesting problem", exc_info=1) + """ + if self.isEnabledFor(TRACE): + self._log(TRACE, msg, args, **kwargs) + + +logging.addLevelName(5, "TRACE") +logging.Logger.trace = trace + +RESOURCE = "`test resource`" + + +class MergeTestsTestcase(unittest.TestCase): + def test_patch_invalid_instruction(self): + source = {"container": {"b": "y", "$patch": "merge"}} + target = {"container": {"a": "x"}} + + with self.assertRaises(ValueError): + merge_instrs, normalized = extract_merge_instructions(source, RESOURCE) + + def test_patch_dict_replace(self): + source = {"container": {"b": "y", "$patch": "replace"}} + target = {"container": {"a": "x"}} + + merge_instrs, normalized = extract_merge_instructions(source, RESOURCE) + + apply_merge_instructions(merge_instrs, normalized, target, self, RESOURCE) + + self.assertDictEqual(target, {"container": {"b": "y"}}) + + def test_patch_list_replace(self): + source = {"container": [{"b": "y"}, {"$patch": "replace"}]} + target = {"container": [{"a": "x"}]} + + merge_instrs, normalized = extract_merge_instructions(source, RESOURCE) + + apply_merge_instructions(merge_instrs, normalized, target, self, RESOURCE) + + self.assertDictEqual(target, {"container": [{"b": "y"}]}) + + def test_patch_dict_delete(self): + source = {"container1": {"container2": {"$patch": "delete"}}} + target = {"container1": {"container2": {"a": "x"}}} + + merge_instrs, normalized = extract_merge_instructions(source, RESOURCE) + + apply_merge_instructions(merge_instrs, normalized, target, self, RESOURCE) + + self.assertDictEqual(target, {"container1": {"container2": None}}) + + def test_patch_list_delete(self): + source = {"container1": {"container2": [{"$patch": "delete", "a": "x"}]}} + target = {"container1": {"container2": [{"b": "y"}, {"a": "x"}]}} + + merge_instrs, normalized = extract_merge_instructions(source, RESOURCE) + + apply_merge_instructions(merge_instrs, normalized, target, self, RESOURCE) + + self.assertDictEqual(target, {"container1": {"container2": [{"b": "y"}]}}) + + def test_delete_primitive_list(self): + source = {"container1": {"container2": {"$deleteFromPrimitiveList/finalizers": ["a", "b", "d"]}}} + target = {"container1": {"container2": {"finalizers": ["a", "b", "c"]}}} + + merge_instrs, normalized = extract_merge_instructions(source, RESOURCE) + + apply_merge_instructions(merge_instrs, normalized, target, self, RESOURCE) + + self.assertDictEqual(target, {"container1": {"container2": {"finalizers": ["c"]}}}) + + def debug(self, msg, *args): + self._log("DEBUG", msg, *args) + + def trace(self, msg, *args): + self._log("TRACE", msg, *args) + + def warning(self, msg, *args): + self._log("WARNING", msg, *args) + + def _log(self, lvl, msg, *args): + print(lvl, msg % args)