Skip to content

Commit

Permalink
Check status messages when resources fail to become ready (#180)
Browse files Browse the repository at this point in the history
* Refactor the resource waiter to use Resource object

* Log status conditions for not ready resources
  • Loading branch information
bsquizz authored Feb 10, 2022
1 parent 4d1cd03 commit 98ef773
Showing 1 changed file with 146 additions and 50 deletions.
196 changes: 146 additions & 50 deletions bonfire/openshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,18 +408,96 @@ def _check_status_for_restype(restype, json_data):
return _check_status_condition(status, "ready", "true")

elif restype == "cyndipipeline":
return _check_status_condition(status, "valid", "true") and status.get("activeTableName")
return (
_check_status_condition(status, "valid", "true")
and status.get("activeTableName") is not None
)

elif restype == "xjoinpipeline":
return _check_status_condition(status, "valid", "true") and status.get("activeIndexName")
return (
_check_status_condition(status, "valid", "true")
and status.get("activeIndexName") is not None
)


def _get_resource_info(item):
kind = item["kind"].lower()
restype = _get_name_for_kind(kind)
name = item["metadata"]["name"]
key = f"{restype}/{name}"
return kind, restype, name, key
class Resource:
def __init__(self, restype=None, name=None, namespace=None, data=None):
if not data and not (restype and name):
raise ValueError("Resource must be instantiated with restype/name or data")

self._restype = restype
self._name = name
self._namespace = namespace
self._data = data

def get_json(self):
self._data = get_json(self._restype, name=self._name, namespace=self._namespace)
return self._data

@property
def data(self):
if not self._data:
self.get_json()
return self._data

@property
def kind(self):
return self.data["kind"].lower()

@property
def restype(self):
return _get_name_for_kind(self.kind)

@property
def name(self):
return self.data["metadata"]["name"]

@property
def namespace(self):
return self.data["metadata"]["namespace"]

@property
def key(self):
return f"{self.restype}/{self.name}"

@property
def uid(self):
return self.data["metadata"]["uid"]

@property
def ready(self):
return _check_status_for_restype(self.restype, self.data)

@property
def status_conditions(self):
status_conditions = []
conditions = self.data.get("status", {}).get("conditions", [])
for c in conditions:
status_value = c.get("status")
status_type = c.get("type")
txt = f"{status_type}: {status_value}"

status_msg = c.get("message")
status_reason = c.get("reason")
msg = status_msg or status_reason
if msg:
txt += f" ({msg})"

status_conditions.append(txt)
return status_conditions


def _get_details(resources):
details = []
for r in resources:
if not r.ready:
detail_msg = f"\n * {r.key} not ready"
if r.status_conditions:
detail_msg += ", status conditions:\n{}".format(
"\n".join([f" - {s}" for s in r.status_conditions])
)
details.append(detail_msg)
return details


class ResourceWaiter:
Expand All @@ -428,8 +506,8 @@ def __init__(self, namespace, restype, name):
self.restype = parse_restype(restype)
self.name = name.lower()
self.observed_resources = dict()
self._uid = None
self.key = f"{self.restype}/{self.name}"
self.resource = None
self._time_last_logged = None
self._time_remaining = None

Expand All @@ -438,21 +516,23 @@ def __init__(self, namespace, restype, name):
f"unable to check status of '{self.restype}' resources on this cluster"
)

def _observe(self, item):
_, restype, _, key = _get_resource_info(item)
if key not in self.observed_resources:
self.observed_resources[key] = {"ready": False}
if not self.observed_resources[key]["ready"]:
if _check_status_for_restype(restype, item):
log.info("[%s] resource is ready!", key)
self.observed_resources[key]["ready"] = True
def _observe(self, resource):
key = resource.key
if key in self.observed_resources:
already_observed_resource = self.observed_resources[key]
if already_observed_resource.ready:
# so we don't keep logging 'resource is ready!' every time we loop
return

self.observed_resources[key] = resource
if resource.ready:
log.info("[%s] resource is ready!", key)

def check_ready(self):
response = get_json(self.restype, name=self.name, namespace=self.namespace)
if response:
self._uid = response["metadata"]["uid"]
self._observe(response)
return all([r["ready"] is True for _, r in self.observed_resources.items()])
self.resource = Resource(self.restype, self.name, self.namespace)
if self.resource.get_json():
self._observe(self.resource)
return all([r.ready is True for _, r in self.observed_resources.items()])
return False

def _check_with_periodic_log(self):
Expand Down Expand Up @@ -487,39 +567,49 @@ def wait_for_ready(self, timeout, reraise=False):
if reraise:
raise
except (TimeoutException, TimedOutError):
log.error("[%s] timed out waiting for resource to be ready", self.key)
# log a "bulleted list" of the not ready resources and their status conditions
msg = f"[{self.key}] timed out waiting for resource to be ready"
details = _get_details([r for _, r in self.observed_resources.items()])
if details:
msg += ", details: {}\n".format("\n".join(details))
log.error(msg)

if reraise:
raise
return False


class ResourceOwnerWaiter(ResourceWaiter):
def _update_observed_resources(self, item):
for owner_ref in item["metadata"].get("ownerReferences", []):
def _update_observed_resources(self, resource):
for owner_ref in resource.data["metadata"].get("ownerReferences", []):
restype_matches = owner_ref["kind"].lower() == self.restype
owner_uid_matches = owner_ref["uid"] == self._uid
owner_uid_matches = owner_ref["uid"] == self.resource.uid
if restype_matches and owner_uid_matches:
_, restype, _, resource_key = _get_resource_info(item)
if resource_key not in self.observed_resources:
self.observed_resources[resource_key] = {"ready": False}
log.info(
"[%s] found owned resource %s",
self.key,
resource_key,
)

# this resource is owned by "self"
if resource.key in self.observed_resources:
already_observed_resource = self.observed_resources[resource.key]
if already_observed_resource.ready:
# so we don't keep logging 'resource is ready!' every time we loop
return
else:
if not resource.ready:
log.info(
"[%s] found owned resource %s, not yet ready",
self.key,
resource.key,
)

self.observed_resources[resource.key] = resource
# check if ready state has transitioned for this resource
if not self.observed_resources[resource_key]["ready"]:
if _check_status_for_restype(restype, item):
log.info("[%s] owned resource %s is ready!", self.key, resource_key)
self.observed_resources[resource_key]["ready"] = True
if resource.ready:
log.info("[%s] owned resource %s is ready!", self.key, resource.key)

def _observe(self, item):
super()._observe(item)
def _observe(self, resource):
super()._observe(resource)
for restype in _available_checkable_resources():
response = get_json(restype, namespace=self.namespace)
for item in response.get("items", []):
self._update_observed_resources(item)
self._update_observed_resources(Resource(data=item))


def wait_for_ready(namespace, restype, name, timeout=600):
Expand All @@ -541,7 +631,7 @@ def wait_for_ready_threaded(waiters, timeout=600):
all_failed_resources = set()
for waiter in waiters:
waiter_failed_resources = [
key for key, val in waiter.observed_resources.items() if val["ready"] is False
k for k, r in waiter.observed_resources.items() if r.ready is False
]
for failed_resource in waiter_failed_resources:
all_failed_resources.add(failed_resource)
Expand Down Expand Up @@ -595,9 +685,9 @@ def _all_resources_ready(namespace, timeout):
for restype in _resources_for_ns_wait():
response = get_json(restype, namespace=namespace)
for item in response.get("items", []):
_, restype, name, resource_key = _get_resource_info(item)
if resource_key not in already_waited_on:
waiter = ResourceWaiter(namespace, restype, name)
resource = Resource(data=item)
if resource.key not in already_waited_on:
waiter = ResourceWaiter(resource.namespace, resource.restype, resource.name)
waiters.append(waiter)

return wait_for_ready_threaded(waiters, timeout)
Expand Down Expand Up @@ -739,7 +829,7 @@ def get_all_namespaces():


def wait_on_cji(namespace, cji_name, timeout):
# first wait for job associated with this CJI to appear
# wait for job associated with this CJI to appear
log.info("waiting for Job to appear owned by CJI '%s'", cji_name)

def _find_job():
Expand All @@ -749,9 +839,15 @@ def _find_job():
except (KeyError, IndexError):
return False

job_name, elapsed = wait_for(
_find_job, num_sec=timeout, message=f"wait for Job to appear owned by CJI '{cji_name}'"
)
cji = Resource("clowdjobinvocation", cji_name, namespace)
try:
job_name, elapsed = wait_for(
_find_job, num_sec=timeout, message=f"wait for Job to appear owned by CJI '{cji_name}'"
)
except TimedOutError:
if not cji.ready:
log.error("[%s] not ready, details: %s\n", cji.key, "\n".join(_get_details([cji])))
raise

log.info(
"found Job '%s' created by CJI '%s', now waiting for pod to appear", job_name, cji_name
Expand Down

0 comments on commit 98ef773

Please sign in to comment.