From f2001cadcbf291b1546d39ea7ca04777442b1ef5 Mon Sep 17 00:00:00 2001 From: Brandon Squizzato <35474886+bsquizz@users.noreply.github.com> Date: Mon, 24 May 2021 12:48:53 -0400 Subject: [PATCH] More intelligent waiting for ClowdEnvironment/ClowdApp resources (#53) * Create OwnerWaiter to wait on resource which owns other resources * Add get_api_resources * Watch status for observed resources in ResourceOwnerWaiter * Fix ClowdEnvironment status check * Rewrite resource wait logic * Wait on any remaining resources --- bonfire/bonfire.py | 80 ++++--- bonfire/namespaces.py | 2 +- bonfire/openshift.py | 472 ++++++++++++++++++++++-------------------- 3 files changed, 302 insertions(+), 252 deletions(-) diff --git a/bonfire/bonfire.py b/bonfire/bonfire.py index 68f83ec0..c60f08b9 100755 --- a/bonfire/bonfire.py +++ b/bonfire/bonfire.py @@ -7,6 +7,7 @@ import warnings from tabulate import tabulate +from wait_for import TimedOutError import bonfire.config as conf from bonfire.qontract import get_apps_for_env, sub_refs @@ -135,11 +136,9 @@ def _get_target_namespace(duration, retries, namespace=None): def _wait_on_namespace_resources(namespace, timeout, db_only=False): if db_only: - time_taken = wait_for_db_resources(namespace, timeout) + wait_for_db_resources(namespace, timeout) else: - time_taken = wait_for_all_resources(namespace, timeout) - if time_taken >= timeout: - _error("Timed out waiting for resources; exiting") + wait_for_all_resources(namespace, timeout) def _prepare_namespace(namespace): @@ -427,7 +426,11 @@ def _cmd_namespace_release(namespace): @options(_timeout_option) def _cmd_namespace_wait_on_resources(namespace, timeout, db_only): """Wait for rolled out resources to be ready in namespace""" - _wait_on_namespace_resources(namespace, timeout, db_only=db_only) + try: + _wait_on_namespace_resources(namespace, timeout, db_only=db_only) + except TimedOutError as err: + log.error("Hit timeout error: %s", err) + _error("namespace wait timed out") @namespace.command("prepare", hidden=True) @@ -604,6 +607,16 @@ def _cmd_config_deploy( clowd_env = match["metadata"]["name"] log.debug("inferred clowd_env: '%s'", clowd_env) + def _err_handler(): + try: + if not no_release_on_fail and not requested_ns and used_ns_reservation_system: + # if we auto-reserved this ns, auto-release it on failure unless + # --no-release-on-fail was requested + log.info("releasing namespace '%s'", ns) + release_namespace(ns) + finally: + _error("deploy failed") + try: log.info("processing app templates...") apps_config = _process( @@ -626,20 +639,19 @@ def _cmd_config_deploy( else: log.info("applying app configs...") apply_config(ns, apps_config) - log.info("waiting on resources...") + log.info("waiting on resources for max of %dsec...", timeout) _wait_on_namespace_resources(ns, timeout) - except (Exception, KeyboardInterrupt): + except KeyboardInterrupt: + log.error("Aborted by keyboard interrupt!") + _err_handler() + except TimedOutError as err: + log.error("Hit timeout error: %s", err) + _err_handler() + except Exception: log.exception("hit unexpected error!") - try: - if not no_release_on_fail and not requested_ns and used_ns_reservation_system: - # if we auto-reserved this ns, auto-release it on failure unless - # --no-release-on-fail was requested - log.info("releasing namespace '%s'", ns) - release_namespace(ns) - finally: - _error("deploy failed") + _err_handler() else: - log.info("successfully deployed to %s", ns) + log.info("successfully deployed to namespace '%s'", ns) click.echo(ns) @@ -667,21 +679,33 @@ def _cmd_process_clowdenv(namespace, clowd_env, template_file): @options(_timeout_option) def _cmd_deploy_clowdenv(namespace, clowd_env, template_file, timeout): """Process ClowdEnv template and deploy to a cluster""" - clowd_env_config = _process_clowdenv(namespace, clowd_env, template_file) - - log.debug("ClowdEnvironment config:\n%s", clowd_env_config) - - apply_config(None, clowd_env_config) + try: + clowd_env_config = _process_clowdenv(namespace, clowd_env, template_file) - if not namespace: - # wait for Clowder to tell us what target namespace it created - namespace = wait_for_clowd_env_target_ns(clowd_env) + log.debug("ClowdEnvironment config:\n%s", clowd_env_config) - _wait_on_namespace_resources(namespace, timeout) + apply_config(None, clowd_env_config) - clowd_env_name = find_clowd_env_for_ns(namespace)["metadata"]["name"] - log.info("ClowdEnvironment '%s' using ns '%s' is ready", clowd_env_name, namespace) - print(namespace) + if not namespace: + # wait for Clowder to tell us what target namespace it created + namespace = wait_for_clowd_env_target_ns(clowd_env) + + log.info("waiting on resources for max of %dsec...", timeout) + _wait_on_namespace_resources(namespace, timeout) + + clowd_env_name = find_clowd_env_for_ns(namespace)["metadata"]["name"] + except KeyboardInterrupt: + log.error("Aborted by keyboard interrupt!") + _error("deploy failed") + except TimedOutError as err: + log.error("Hit timeout error: %s", err) + _error("deploy failed") + except Exception: + log.exception("hit unexpected error!") + _error("deploy failed") + else: + log.info("ClowdEnvironment '%s' using ns '%s' is ready", clowd_env_name, namespace) + click.echo(namespace) @config.command("write-default") diff --git a/bonfire/namespaces.py b/bonfire/namespaces.py index 82f2b5e1..794944db 100644 --- a/bonfire/namespaces.py +++ b/bonfire/namespaces.py @@ -374,7 +374,7 @@ def add_base_resources(namespace, secret_names): oc("apply", f="-", _in=json.dumps(processed_template)) # wait for any deployed base resources to become 'ready' - wait_for_all_resources(namespace, timeout=conf.RECONCILE_TIMEOUT, wait_on_app=False) + wait_for_all_resources(namespace, timeout=conf.RECONCILE_TIMEOUT) def _reconcile_ns(ns, base_secret_names): diff --git a/bonfire/openshift.py b/bonfire/openshift.py index 8b670b0a..bc897180 100644 --- a/bonfire/openshift.py +++ b/bonfire/openshift.py @@ -1,6 +1,7 @@ import functools import json import logging +import re import threading import time @@ -14,51 +15,50 @@ log = logging.getLogger(__name__) -# Resource types and their cli shortcuts -# Mostly listed here: https://docs.openshift.com/online/cli_reference/basic_cli_operations.html -SHORTCUTS = { - "build": None, - "buildconfig": "bc", - "daemonset": "ds", - "deployment": "deploy", - "deploymentconfig": "dc", - "event": "ev", - "imagestream": "is", - "imagestreamtag": "istag", - "imagestreamimage": "isimage", - "job": None, - "limitrange": "limits", - "namespace": "ns", - "node": "no", - "pod": "po", - "project": "project", - "resourcequota": "quota", - "replicationcontroller": "rc", - "secrets": "secret", - "service": "svc", - "serviceaccount": "sa", - "statefulset": "sts", - "persistentvolume": "pv", - "persistentvolumeclaim": "pvc", - "configmap": "cm", - "replicaset": "rs", - "route": None, - "clowdenvironment": None, - "clowdapp": None, -} + +# assume that the result of this will not change during execution of our app +@functools.lru_cache(maxsize=None, typed=False) +def get_api_resources(): + output = oc("api-resources", verbs="list", _silent=True).strip() + if not output: + return [] + + lines = output.split("\n") + # lines[0] is the table header, use it to figure out length of each column + groups = re.findall(r"(\w+\s+)", lines[0]) + + name_start = 0 + name_end = len(groups[0]) + shortnames_start = name_end + shortnames_end = name_end + len(groups[1]) + apigroup_start = shortnames_end + apigroup_end = shortnames_end + len(groups[2]) + namespaced_start = apigroup_end + namespaced_end = apigroup_end + len(groups[3]) + kind_start = namespaced_end + + resources = [] + for line in lines[1:]: + shortnames = line[shortnames_start:shortnames_end].strip() + resource = { + "name": line[name_start:name_end].strip().rstrip("s") or None, + "shortnames": shortnames.split(",") if shortnames else [], + "apigroup": line[apigroup_start:apigroup_end].strip() or None, + "namespaced": line[namespaced_start:namespaced_end].strip() == "true", + "kind": line[kind_start:].strip() or None, + } + resources.append(resource) + return resources def parse_restype(string): """ Given a resource type or its shortcut, return the full resource type name. """ - string_lower = string.lower() - if string_lower in SHORTCUTS: - return string_lower - - for resource_name, shortcut in SHORTCUTS.items(): - if string_lower == shortcut: - return resource_name + s = string.lower() + for r in get_api_resources(): + if s in r["shortnames"] or s == r["name"]: + return r["name"] raise ValueError("Unknown resource type: {}".format(string)) @@ -193,7 +193,7 @@ def oc(*args, **kwargs): log.warning("Non-zero return code ignored") -# we will assume that 'oc whoami' will not change during execution of a single 'bonfire' command +# we will assume that 'oc whoami' will not change during execution @functools.lru_cache(maxsize=None, typed=False) def whoami(): name = oc("whoami", _silent=True).strip() @@ -262,7 +262,36 @@ class StatusError(Exception): pass -_CHECKABLE_RESOURCES = ["deploymentconfig", "deployment", "statefulset", "daemonset"] +# resources we are able to parse the status of +_CHECKABLE_RESOURCES = [ + "deploymentconfig", + "deployment", + "statefulset", + "daemonset", + "clowdapp", + "clowdenvironment", + "kafka", + "kafkaconnect", +] + + +def _available_checkable_resources(namespaced=False): + """Returns resources we are able to parse status of that are present on the cluster.""" + if namespaced: + return [ + r["name"] + for r in get_api_resources() + if r["name"] in _CHECKABLE_RESOURCES and r["namespaced"] + ] + + return [r["name"] for r in get_api_resources() if r["name"] in _CHECKABLE_RESOURCES] + + +def _get_name_for_kind(kind): + for r in get_api_resources(): + if r["kind"].lower() == kind.lower(): + return r["name"] + raise ValueError(f"unable to find resource name for kind '{kind}'") def _check_status_for_restype(restype, json_data): @@ -286,14 +315,17 @@ def _check_status_for_restype(restype, json_data): if not status: return False + generation = json_data["metadata"].get("generation") + status_generation = status.get("observedGeneration") or status.get("generation") + if generation and status_generation and generation != status_generation: + return False + if restype == "deploymentconfig" or restype == "deployment": spec_replicas = json_data["spec"]["replicas"] available_replicas = status.get("availableReplicas", 0) updated_replicas = status.get("updatedReplicas", 0) - unavailable_replicas = status.get("unavailableReplicas", 1) - if unavailable_replicas == 0: - if available_replicas == spec_replicas and updated_replicas == spec_replicas: - return True + if available_replicas == spec_replicas and updated_replicas == spec_replicas: + return True elif restype == "statefulset": spec_replicas = json_data["spec"]["replicas"] @@ -309,203 +341,207 @@ def _check_status_for_restype(restype, json_data): if status.get("phase").lower() == "running": return True + elif restype in ("clowdenvironment", "clowdapp"): + return str(status.get("ready")).lower() == "true" + + elif restype in ("kafka", "kafkaconnect"): + conditions = status.get("conditions", []) + for c in conditions: + if str(c.get("status")).lower() == "true" and c.get("type").lower() == "ready": + return True -def _wait_with_periodic_status_check(namespace, timeout, key, restype, name): - """Check if resource is ready using _check_status_for_restype, periodically log an update.""" - time_last_logged = time.time() - time_remaining = timeout - def _ready(): - nonlocal time_last_logged, time_remaining +def _get_resource_info(item): + kind = item["kind"].lower() + restype = _get_name_for_kind(kind) + name = item["metadata"]["name"] + key = f"{restype}/{name}" + return kind, restype, name, key + + +class ResourceWaiter: + def __init__(self, namespace, restype, name): + self.namespace = namespace + self.restype = parse_restype(restype) + self.name = name.lower() + self.observed_resources = dict() + self._uid = None + self.key = f"{self.restype}/{self.name}" + self._time_last_logged = None + self._time_remaining = None + + if self.restype not in _available_checkable_resources(): + raise ValueError( + f"unable to check status of '{self.restype}' resources on this cluster" + ) - j = get_json(restype, name, namespace=namespace) - if _check_status_for_restype(restype, j): + def _observe(self, item): + _, restype, _, key = _get_resource_info(item) + if key not in self.observed_resources: + self.observed_resources[key] = {"ready": False} + if not self.observed_resources[key]["ready"]: + if _check_status_for_restype(restype, item): + log.info("[%s] resource is ready!", key) + self.observed_resources[key]["ready"] = True + + def check_ready(self): + response = get_json(self.restype, name=self.name, namespace=self.namespace) + self._uid = response["metadata"]["uid"] + self._observe(response) + return all([r["ready"] is True for _, r in self.observed_resources.items()]) + + def _check_with_periodic_log(self): + if self.check_ready(): return True - if time.time() > time_last_logged + 60: - time_remaining -= 60 - if time_remaining: - log.info("[%s] waiting %dsec longer", key, time_remaining) - time_last_logged = time.time() + if time.time() > self._time_last_logged + 60: + self._time_remaining -= 60 + if self._time_remaining: + log.info("[%s] waiting %dsec longer", self.key, self._time_remaining) + self._time_last_logged = time.time() return False - wait_for( - _ready, - timeout=timeout, - delay=5, - message="wait for '{}' to be ready".format(key), - ) - - -def wait_for_ready(namespace, restype, name, timeout=300, _result_dict=None): - """ - Wait {timeout} for resource to be complete/ready/active. - - Args: - restype: type of resource, which can be "build", "dc", "deploymentconfig" - name: name of resource - timeout: time in secs to wait for resource to become ready - - Returns: - True if ready, - False if timed out - - '_result_dict' can be passed when running this in a threaded fashion - to store the result of this wait as: - _result_dict[resource_name] = True or False - """ - restype = parse_restype(restype) - key = "{}/{}".format(SHORTCUTS.get(restype) or restype, name) - - if _result_dict is None: - _result_dict = dict() - _result_dict[key] = False - - log.info("[%s] waiting up to %dsec for resource to be ready", key, timeout) - - try: - # Do not use rollout status for statefulset/daemonset yet until we can handle - # https://github.com/kubernetes/kubernetes/issues/64500 - if restype in ["deployment", "deploymentconfig"]: - # use oc rollout status for the applicable resource types - oc( - "rollout", - "status", - key, - namespace=namespace, - _timeout=timeout, - _stdout_log_prefix=f"[{key}] ", - _stderr_log_prefix=f"[{key}] ", - ) - else: - _wait_with_periodic_status_check(namespace, timeout, key, restype, name) - - log.info("[%s] is ready!", key) - _result_dict[key] = True - return True - except (StatusError, ErrorReturnCode) as err: - log.error("[%s] hit error waiting for resource to be ready: %s", key, str(err)) - except (TimeoutException, TimedOutError): - log.error("[%s] timed out waiting for resource to be ready", key) - return False - + def wait_for_ready(self, timeout): + self._time_last_logged = time.time() + self._time_remaining = timeout -def wait_for_ready_threaded(namespace, restype_name_list, timeout=300): - """ - Wait for multiple delpoyments in a threaded fashion. + try: + # check for ready initially, only wait_for if we need to + log.debug("[%s] checking if 'ready'", self.key) + if not self.check_ready(): + log.info("[%s] waiting up to %dsec for resource to be 'ready'", self.key, timeout) + wait_for( + self._check_with_periodic_log, + message=f"wait for {self.key} to be 'ready'", + delay=5, + timeout=timeout, + ) + return True + except (StatusError, ErrorReturnCode) as err: + log.error("[%s] hit error waiting for resource to be ready: %s", self.key, str(err)) + except (TimeoutException, TimedOutError): + log.error("[%s] timed out waiting for resource to be ready", self.key) + return False - Args: - restype_name_list: list of tuples with (resource_type, resource_name,) - timeout: timeout for each thread - Returns: - True if all deployments are ready - False if any failed - """ - result_dict = dict() +class ResourceOwnerWaiter(ResourceWaiter): + def _update_observed_resources(self, item): + for owner_ref in item["metadata"].get("ownerReferences", []): + restype_matches = owner_ref["kind"].lower() == self.restype + owner_uid_matches = owner_ref["uid"] == self._uid + if restype_matches and owner_uid_matches: + _, restype, _, resource_key = _get_resource_info(item) + if resource_key not in self.observed_resources: + self.observed_resources[resource_key] = {"ready": False} + log.info( + "[%s] found owned resource %s", + self.key, + resource_key, + ) + + # check if ready state has transitioned for this resource + if not self.observed_resources[resource_key]["ready"]: + if _check_status_for_restype(restype, item): + log.info("[%s] owned resource %s is ready!", self.key, resource_key) + self.observed_resources[resource_key]["ready"] = True + + def _observe(self, item): + super()._observe(item) + for restype in _available_checkable_resources(): + response = get_json(restype, namespace=self.namespace) + for item in response.get("items", []): + self._update_observed_resources(item) + + +def wait_for_ready(namespace, restype, name, timeout=300): + waiter = ResourceWaiter(namespace, restype, name) + return waiter.wait_for_ready(timeout) + + +def wait_for_ready_threaded(waiters, timeout=300): threads = [ - threading.Thread( - target=wait_for_ready, args=(namespace, restype, name, timeout, result_dict) - ) - for restype, name in restype_name_list + threading.Thread(target=waiter.wait_for_ready, daemon=True, args=(timeout,)) + for waiter in waiters ] for thread in threads: - thread.daemon = True - thread.name = thread.name.lower() # because I'm picky + thread.name = thread.name.lower() thread.start() for thread in threads: thread.join() - failed = [key for key, result in result_dict.items() if not result] + all_failed_resources = set() + for waiter in waiters: + waiter_failed_resources = [ + key for key, val in waiter.observed_resources.items() if val["ready"] is False + ] + for failed_resource in waiter_failed_resources: + all_failed_resources.add(failed_resource) - if failed: - log.info("Some resources failed to become ready: %s", ", ".join(failed)) + if all_failed_resources: + log.info("some resources failed to become ready: %s", ", ".join(all_failed_resources)) return False return True -def _wait_for_resources(namespace, timeout, skip=None): - skip = skip or [] - wait_for_list = [] - for restype in _CHECKABLE_RESOURCES: - try: - resources = get_json(restype, namespace=namespace) - except ErrorReturnCode as err: - if "the server doesn't have a resource type" in str(err): - log.debug("server has no resources of type '%s', skipping wait for them", restype) - resources = {"items": []} - else: - raise - for item in resources["items"]: - entry = (restype, item["metadata"]["name"]) - if entry not in skip: - wait_for_list.append((restype, item["metadata"]["name"])) - - result = wait_for_ready_threaded(namespace, wait_for_list, timeout=timeout) - return result, wait_for_list - - -def _operator_resource_present(namespace, owner_kind): - response = get_json("deployment", namespace=namespace) - for item in response.get("items", []): - if item.get("metadata", {}).get("ownerReferences"): - if item["metadata"]["ownerReferences"][0]["kind"] == owner_kind: - return True - return False +def _all_resources_ready(namespace, timeout): + already_waited_on = set() + # wait on ClowdEnvironment + start = time.time() -def _wait_for_clowdapp_resources(namespace, timeout): - log.info("Waiting for resources owned by 'ClowdApp' to appear") - return wait_for( - _operator_resource_present, - func_args=(namespace, "ClowdApp"), - message="wait for ClowdApp-owned resources to appear", - timeout=timeout, - ) + clowd_env_name = find_clowd_env_for_ns(namespace)["metadata"]["name"] + waiter = ResourceOwnerWaiter(namespace, "clowdenvironment", clowd_env_name) + if not waiter.wait_for_ready(timeout): + return False + for key in waiter.observed_resources: + already_waited_on.add(key) -def _operator_resources(namespace, timeout, wait_on_app=True): - log.info("Waiting for resources owned by 'ClowdEnvironment' to appear in ns '%s'", namespace) - wait_for( - _operator_resource_present, - func_args=(namespace, "ClowdEnvironment"), - message="wait for ClowdEnvironment-owned resources to appear", - timeout=timeout, - ) - # now wait for everything in ns to be 'ready' - result, already_waited_on = _wait_for_resources(namespace, timeout) + end = time.time() + elapsed = end - start + timeout = int(timeout - elapsed) + + # wait on all ClowdApps + start = time.time() - # the first wait failed, so just return 'False' now - if not result: - return result + waiters = [] + clowdapps = get_json("clowdapp", namespace=namespace) + for clowdapp in clowdapps["items"]: + waiter = ResourceOwnerWaiter(namespace, "clowdapp", clowdapp["metadata"]["name"]) + waiters.append(waiter) + if not wait_for_ready_threaded(waiters, timeout): + return False + + for waiter in waiters: + for key in waiter.observed_resources: + already_waited_on.add(key) - if wait_on_app: - _wait_for_clowdapp_resources(namespace, timeout) - # now that ClowdApp resources showed up, again wait for everything new in ns to be 'ready' - result, _ = _wait_for_resources(namespace, timeout, already_waited_on) + end = time.time() + elapsed = end - start + timeout = int(timeout - elapsed) - return result + # wait on anything else not covered by the above + waiters = [] + for restype in _available_checkable_resources(namespaced=True): + response = get_json(restype, namespace=namespace) + for item in response.get("items", []): + _, restype, name, resource_key = _get_resource_info(item) + if resource_key not in already_waited_on: + waiter = ResourceWaiter(namespace, restype, name) + waiters.append(waiter) + return wait_for_ready_threaded(waiters, timeout) -def wait_for_all_resources(namespace, timeout=300, wait_on_app=True): + +def wait_for_all_resources(namespace, timeout=300): # wrap the other wait_fors in 1 wait_for so overall timeout is honored # wait_for returns a tuple of the return code and the time taken - if len(get_json("clowdapp", namespace=namespace).get("items", [])) == 0: - # only wait on ClowdApp if one was deployed - wait_on_app = False - - return_val, time_taken = wait_for( - _operator_resources, - func_args=(namespace, timeout, wait_on_app), + wait_for( + _all_resources_ready, + func_args=(namespace, timeout), message="wait for all deployed resources to be ready", timeout=timeout, ) - return time_taken - - -def _specific_clowdapp_resources(namespace, resources_to_wait_for, timeout): - _wait_for_clowdapp_resources(namespace, timeout) - return wait_for_ready_threaded(namespace, resources_to_wait_for, timeout=timeout) def wait_for_db_resources(namespace, timeout=300): @@ -513,30 +549,22 @@ def wait_for_db_resources(namespace, timeout=300): if len(clowdapps) == 0: raise ValueError(f"no clowdapps found in ns '{namespace}', no DB's to wait for") - resources_to_wait_for = set() + waiters = [] for clowdapp in clowdapps: clowdapp_name = clowdapp["metadata"]["name"] db_name = clowdapp["spec"].get("database", {}).get("name") if db_name: - resources_to_wait_for.add(("deployment", f"{clowdapp_name}-db")) + waiters.append(ResourceWaiter(namespace, "deployment", f"{clowdapp_name}-db")) shared_db_app_name = clowdapp["spec"].get("database", {}).get("sharedDbAppName") if shared_db_app_name: - resources_to_wait_for.add(("deployment", f"{shared_db_app_name}-db")) + waiters.append(ResourceWaiter(namespace, "deployment", f"{shared_db_app_name}-db")) - if not resources_to_wait_for: + if not waiters: raise ValueError( f"no clowdapps with db configurations found in '{namespace}', no DB's to wait for" ) - # wrap the other wait_fors in 1 wait_for so overall timeout is honored - _, time_taken = wait_for( - _specific_clowdapp_resources, - func_args=(namespace, resources_to_wait_for, timeout), - message="wait for db resources to be ready", - timeout=timeout, - ) - - return time_taken + wait_for_ready_threaded(waiters, timeout) def copy_namespace_secrets(src_namespace, dst_namespace, secret_names): @@ -611,11 +639,9 @@ def wait_for_clowd_env_target_ns(clowd_env_name): @functools.lru_cache(maxsize=None, typed=False) def on_k8s(): """Detect whether this is a k8s or openshift cluster based on existence of projects.""" - project_resources = oc( - "api-resources", "--api-group=project.openshift.io", o="name", _silent=True - ) + project_resource = [r for r in get_api_resources() if r["name"] == "project"] - if str(project_resources).strip(): + if project_resource: return False return True