Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix app status when deleted event #5

Merged
merged 3 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ export TOKEN_API_ENDPOINT=<end point for fetching token> (optional, is set to BA
export APP_STATUS_API_ENDPOINT=<end point for status updates> (optional, is set to BASE_URL + "/api/v1/app-status/" if not defined)
```

To retrieve additional log messages, set:

```bash
export DEBUG=True
export TEST_LOG_STREAM=sys.stdout
```

### Running the Service
Navigate to the project directory and execute the following command to run the service:

Expand Down
42 changes: 33 additions & 9 deletions serve_event_listener/status_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,17 @@ def set_k8s_api_client(self, k8s_api_client: client.CoreV1Api, namespace: str):
self.k8s_api_client = k8s_api_client
self.namespace = namespace

def fetch_status_from_k8s_api(self, release: str) -> Tuple[str, str, str]:
def fetch_status_from_k8s_api(
self, release: str, response_limit: int = 1000
) -> Tuple[str, str, str]:
"""
Get the actual status of a release from k8s via the client API.
Because this can be as costly operation it is only used at critical times such as deleted pods.

Parameters:
- release (str): The release
- response_limit (int): The maximum number of objects to return from the k8s API call.

Returns:
- Tuple[str, str, str]: The status of the pod, container message, pod message

Expand All @@ -141,7 +147,7 @@ def fetch_status_from_k8s_api(self, release: str) -> Tuple[str, str, str]:

try:
api_response = self.k8s_api_client.list_namespaced_pod(
self.namespace, limit=500, timeout_seconds=120, watch=False
self.namespace, limit=response_limit, timeout_seconds=120, watch=False
)

for pod in api_response.items:
Expand Down Expand Up @@ -185,21 +191,27 @@ def update(self, event: dict) -> None:
- status_data (dict): Updated dictionary containing status info.
- release (str): The release of the updated status
"""
logger.debug("Event triggered update_status_data")

pod = event.get("object", None)

# TODO: Try catch here instead
if pod:
release = pod.metadata.labels.get("release")

logger.info(
f"--- Event triggered update status data from release {release}"
)

status_object = pod.status

status, container_message, pod_message = (
StatusData.determine_status_from_k8s(status_object)
)
release = pod.metadata.labels.get("release")

logger.debug(f"Event triggered from release {release}")
logger.debug(f"Status: {status} - Message: {container_message}")
logger.debug(
f"Pod status converted to AppStatus={status}, \
ContMessage:{container_message}, \
PodMessage:{pod_message}"
)

creation_timestamp = pod.metadata.creation_timestamp
deletion_timestamp = pod.metadata.deletion_timestamp
Expand Down Expand Up @@ -268,29 +280,41 @@ def update_or_create_status(
Dict: Updated status data.
"""

log_msg = ""
if release in status_data:
log_msg = f"Status data before update:{status_data[release]}"
else:
log_msg = "Release not in status_data. Adding now."

logger.debug(
f"Release {release}. Status data before update:{status_data}. \
{(release in status_data)=}? \
f"Release {release}. {log_msg} \
creation_timestamp={creation_timestamp}, deletion_timestamp={deletion_timestamp}"
)

if (
release not in status_data
or creation_timestamp >= status_data[release]["creation_timestamp"]
or deletion_timestamp is not None
):

status = "Deleted" if deletion_timestamp else status

if status == "Deleted":
# Status Deleted is a destructive action
# Therefore we double-check the k8s status directly upon detecting this
if self.k8s_api_client is None:
logger.warning("No k8s API client: k8s_api_client is None")

if self.k8s_api_client:
# Only use if the k8s client api has been set
# Unit tests for example do not currently set a k8s api
status, *_ = self.fetch_status_from_k8s_api(release)
logger.debug(f"Fetched release status from k8s: {status}")

if status is None:
# No pod with this release found. Set status to Deleted
status = "Deleted"
logger.info("k8s returned status None. Setting to Deleted")

if status != "Deleted":
deletion_timestamp = None
Expand Down
12 changes: 11 additions & 1 deletion serve_event_listener/status_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def add(self, status_data):
self.queue.put(status_data)

def process(self):
log_cnt_q_is_empty = 0

while not self.stop_event.is_set():
try:
status_data = self.queue.get(timeout=2) # Wait for 2 seconds
Expand All @@ -46,8 +48,16 @@ def process(self):
logger.debug(
f"Processed queue successfully of release {release}, new status={new_status}"
)
log_cnt_q_is_empty = 0
except queue.Empty:
pass # Continue looping if the queue is empty
if log_cnt_q_is_empty <= 2:
logger.debug("Nothing to do. The queue is empty.")
elif log_cnt_q_is_empty == 3:
logger.debug(
"Nothing to do. The queue is empty. Suppressing this message for now."
)
log_cnt_q_is_empty += 1
# pass # Continue looping if the queue is empty

def stop_processing(self):
logger.warning("Queue processing stopped")
Expand Down
38 changes: 37 additions & 1 deletion tests/test_status_data.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
import logging
import os
import sys
import time
import unittest

from serve_event_listener.status_data import StatusData
from tests.create_pods import Pod, PodStatus

# Setup logging output for unit test execution
DEBUG = os.getenv("DEBUG", default="False").lower() in ("true", "1", "t")
# Set up logging configuration with the ColoredFormatter
if DEBUG:
level = logging.DEBUG
else:
level = logging.INFO

TEST_LOG_STREAM = os.environ.get("TEST_LOG_STREAM", None)
if TEST_LOG_STREAM and eval(TEST_LOG_STREAM) is not None:
logging.basicConfig(stream=eval(TEST_LOG_STREAM), level=level)
else:
logging.basicConfig(handlers=[logging.NullHandler()], level=level)


class TestPodProcessing(unittest.TestCase):
release = "some_release"
Expand Down Expand Up @@ -94,14 +111,21 @@ def test_replica_scenario(self):

self.assertEqual(self.status_data.status_data[release].get("status"), "Deleted")

@unittest.skip(
"This test no longer works after we rely on k8s truth for deletions."
)
def test_valid_and_invalid_image_edits(self):
"""
This scenario creates a pod, then creates a pod with an invalid image, and finally
it created a pod with a valid image.
it creates a pod with a valid image.
After the third pod is created, the first two are deleted.
Finally the valid pod is also deleted.
This occurs when a user chnages the image to an invalid image and then valid image.
"""

# TODO: Consider re-enabling this test by for example creating a parallel data structure
# containing a list of k8s pods and statuses.

release = "r-valid-invalid-images"

# Pod: pod
Expand Down Expand Up @@ -168,6 +192,18 @@ def test_valid_and_invalid_image_edits(self):
ts valid_pod created={self.valid_pod.metadata.creation_timestamp}, {msg}",
)

# Finally also delete the valid pod
self.valid_pod.delete()
self.status_data.update({"object": self.valid_pod})

time.sleep(0.01)

self.assertEqual(
self.status_data.status_data[release].get("status"),
"Deleted",
"Release should be Deleted after delete of the last, valid pod.",
)


class TestStatusConverter(unittest.TestCase):
"""Verifies the translation logic of k8s status objects to app status codes.
Expand Down
Loading