Skip to content

Commit

Permalink
Use a watcher thread when waiting on all resources to GET less (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
bsquizz authored Feb 11, 2022
1 parent 98ef773 commit 2f5a8e3
Showing 1 changed file with 108 additions and 53 deletions.
161 changes: 108 additions & 53 deletions bonfire/openshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,10 @@ def namespace(self):

@property
def key(self):
return f"{self.restype}/{self.name}"
if self._restype and self._name:
return f"{self._restype}/{self._name}"
else:
return f"{self.restype}/{self.name}"

@property
def uid(self):
Expand Down Expand Up @@ -500,22 +503,79 @@ def _get_details(resources):
return details


class ResourceWatcher(threading.Thread):
def __init__(self, namespace, *args, **kwargs):
super().__init__(*args, **kwargs)
self.daemon = True
self.namespace = namespace
self.resources = {}
self._stopped = threading.Event()

def run(self):
log.debug("starting resource watcher for namespace '%s'", self.namespace)
while not self._stopped.is_set():
found_keys = []
for restype in _available_checkable_resources():
response = get_json(restype, namespace=self.namespace)
for item in response.get("items", []):
r = Resource(data=item)
self.resources[r.key] = r
found_keys.append(r.key)
for key in list(self.resources.keys()):
if key not in found_keys:
del self.resources[key]
time.sleep(5)
log.debug("resource watcher stopped for namespace '%s'", self.namespace)

def stop(self):
self._stopped.set()


class ResourceWaiter:
def __init__(self, namespace, restype, name):
def __init__(self, namespace, restype, name, watch_owned=False, watcher=None):
self.namespace = namespace
self.restype = parse_restype(restype)
self.name = name.lower()
self.watch_owned = watch_owned
self.watcher = watcher
self.observed_resources = dict()
self.key = f"{self.restype}/{self.name}"
self.resource = None
self._time_last_logged = None
self._time_remaining = None

if self.watch_owned and not self.watcher:
raise ValueError("watcher must be specified if using watch_owned=True")

if self.restype not in _available_checkable_resources():
raise ValueError(
f"unable to check status of '{self.restype}' resources on this cluster"
)

def _check_owned_resources(self, resource):
for owner_ref in resource.data["metadata"].get("ownerReferences", []):
restype_matches = owner_ref["kind"].lower() == self.restype
owner_uid_matches = owner_ref["uid"] == self.resource.uid
if restype_matches and owner_uid_matches:
# this resource is owned by "self"
if resource.key in self.observed_resources:
already_observed_resource = self.observed_resources[resource.key]
if already_observed_resource.ready:
# so we don't keep logging 'resource is ready!' every time we loop
return
else:
if not resource.ready:
log.info(
"[%s] found owned resource %s, not yet ready",
self.key,
resource.key,
)

self.observed_resources[resource.key] = resource
# check if ready state has transitioned for this resource
if resource.ready:
log.info("[%s] owned resource %s is ready!", self.key, resource.key)

def _observe(self, resource):
key = resource.key
if key in self.observed_resources:
Expand All @@ -525,12 +585,23 @@ def _observe(self, resource):
return

self.observed_resources[key] = resource

if self.watch_owned:
for _, r in self.watcher.resources.items():
self._check_owned_resources(r)

if resource.ready:
log.info("[%s] resource is ready!", key)

def check_ready(self):
self.resource = Resource(self.restype, self.name, self.namespace)
if self.resource.get_json():
if self.watcher:
self.resource = self.watcher.resources.get(self.key)
data = self.resource.data if self.resource else None
else:
self.resource = Resource(self.restype, self.name, self.namespace)
data = self.resource.get_json()

if data:
self._observe(self.resource)
return all([r.ready is True for _, r in self.observed_resources.items()])
return False
Expand Down Expand Up @@ -579,39 +650,6 @@ def wait_for_ready(self, timeout, reraise=False):
return False


class ResourceOwnerWaiter(ResourceWaiter):
def _update_observed_resources(self, resource):
for owner_ref in resource.data["metadata"].get("ownerReferences", []):
restype_matches = owner_ref["kind"].lower() == self.restype
owner_uid_matches = owner_ref["uid"] == self.resource.uid
if restype_matches and owner_uid_matches:
# this resource is owned by "self"
if resource.key in self.observed_resources:
already_observed_resource = self.observed_resources[resource.key]
if already_observed_resource.ready:
# so we don't keep logging 'resource is ready!' every time we loop
return
else:
if not resource.ready:
log.info(
"[%s] found owned resource %s, not yet ready",
self.key,
resource.key,
)

self.observed_resources[resource.key] = resource
# check if ready state has transitioned for this resource
if resource.ready:
log.info("[%s] owned resource %s is ready!", self.key, resource.key)

def _observe(self, resource):
super()._observe(resource)
for restype in _available_checkable_resources():
response = get_json(restype, namespace=self.namespace)
for item in response.get("items", []):
self._update_observed_resources(Resource(data=item))


def wait_for_ready(namespace, restype, name, timeout=600):
waiter = ResourceWaiter(namespace, restype, name)
return waiter.wait_for_ready(timeout)
Expand Down Expand Up @@ -642,15 +680,21 @@ def wait_for_ready_threaded(waiters, timeout=600):
return True


def _all_resources_ready(namespace, timeout):
def _all_resources_ready(namespace, timeout, watcher):
already_waited_on = set()

# wait on ClowdEnvironment, if there's one using this ns as its targetNamespace
start = time.time()

clowd_env = find_clowd_env_for_ns(namespace)
if clowd_env:
waiter = ResourceOwnerWaiter(namespace, "clowdenvironment", clowd_env["metadata"]["name"])
waiter = ResourceWaiter(
namespace,
"clowdenvironment",
clowd_env["metadata"]["name"],
watch_owned=True,
watcher=watcher,
)
if not waiter.wait_for_ready(timeout):
return False

Expand All @@ -667,7 +711,9 @@ def _all_resources_ready(namespace, timeout):
waiters = []
clowdapps = get_json("clowdapp", namespace=namespace)
for clowdapp in clowdapps["items"]:
waiter = ResourceOwnerWaiter(namespace, "clowdapp", clowdapp["metadata"]["name"])
waiter = ResourceWaiter(
namespace, "clowdapp", clowdapp["metadata"]["name"], watch_owned=True, watcher=watcher
)
waiters.append(waiter)
if not wait_for_ready_threaded(waiters, timeout):
return False
Expand All @@ -682,26 +728,35 @@ def _all_resources_ready(namespace, timeout):

# wait on anything else not covered by the above
waiters = []
for restype in _resources_for_ns_wait():
response = get_json(restype, namespace=namespace)
for item in response.get("items", []):
resource = Resource(data=item)
if resource.key not in already_waited_on:
waiter = ResourceWaiter(resource.namespace, resource.restype, resource.name)
waiters.append(waiter)
for k, r in watcher.resources.items():
if r.restype in _resources_for_ns_wait() and r.key not in already_waited_on:
waiter = ResourceWaiter(
r.namespace,
r.restype,
r.name,
watch_owned=True,
watcher=watcher,
)
waiters.append(waiter)

return wait_for_ready_threaded(waiters, timeout)


def wait_for_all_resources(namespace, timeout=600):
# wrap the other wait_fors in 1 wait_for so overall timeout is honored
# wait_for returns a tuple of the return code and the time taken
wait_for(
_all_resources_ready,
func_args=(namespace, timeout),
message="wait for all deployed resources to be ready",
timeout=timeout,
)
watcher = ResourceWatcher(namespace)
watcher.start()

try:
wait_for(
_all_resources_ready,
func_args=(namespace, timeout, watcher),
message="wait for all deployed resources to be ready",
timeout=timeout,
)
finally:
watcher.stop()


def wait_for_db_resources(namespace, timeout=600):
Expand Down

0 comments on commit 2f5a8e3

Please sign in to comment.