Skip to content

Commit

Permalink
add the ability to purge a specific queue of tasks (#4727)
Browse files Browse the repository at this point in the history
* add the ability to purge a specific queue of tasks
  • Loading branch information
lcouzens authored Oct 9, 2023
1 parent 94f9c79 commit 9b6b6f5
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
15 changes: 11 additions & 4 deletions koku/masu/api/running_celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,25 @@ def clear_celery_queues(request):
if request.method == "GET":
params = request.query_params
clear_all = params.get("clear_all")
LOG.info(f"Clearing all queues parameter: {clear_all}")

# clear default_celery queue
purged_tasks = app.control.purge()
queue = params.get("queue")

# clear all queues
if clear_all:
# clear default_celery queue
purged_tasks = app.control.purge()
LOG.info(f"Clearing all queues parameter: {clear_all}")
queue_lengths = list(collect_queue_metrics().values())
purged_tasks += sum(queue_lengths)
r = redis.Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB)
r.flushall()

if queue:
LOG.info(f"Clearing tasks from {queue} queue")
r = redis.Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB)
queue_lengths = collect_queue_metrics().get(queue)
purged_tasks += queue_lengths
r.delete(queue)

LOG.info(f"Celery purged tasks: {purged_tasks}")
return Response({"purged_tasks": purged_tasks})

Expand Down
27 changes: 17 additions & 10 deletions koku/masu/test/api/test_running_celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,6 @@ def test_celery_queue_lengths(self, mock_collect, _):
response = self.client.get(reverse("celery_queue_lengths"))
self.assertEqual(response.status_code, 200)

@patch("koku.middleware.MASU", return_value=True)
@patch("masu.api.running_celery_tasks.app")
def test_clear_celery_queues_default(self, mock_celery, _):
"""Test the GET of clear_celery_queues endpoint."""
mock_celery.control.purge.return_value = 0
response = self.client.get(reverse("clear_celery_queues"))
mock_celery.control.purge.assert_called_once()
self.assertEqual(response.status_code, 200)
self.assertIn("purged_tasks", response.data)

@patch("koku.middleware.MASU", return_value=True)
@patch("masu.api.running_celery_tasks.app")
@patch("masu.api.running_celery_tasks.collect_queue_metrics")
Expand All @@ -86,6 +76,23 @@ def test_clear_celery_queues_clear_all(self, mock_redis, mock_collect, mock_cele
mock_collect.assert_called_once()
mock_redis.flushall.assert_called_once()

@patch("koku.middleware.MASU", return_value=True)
@patch("masu.api.running_celery_tasks.collect_queue_metrics")
@patch("masu.api.running_celery_tasks.redis")
def test_clear_queue(self, mock_redis, mock_collect, _):
"""Test the GET of clear_celery_queues endpoint with specific queue."""
expected_key = "purged_tasks"
expected_queue_lengths = {"priority": 2}
mock_collect.return_value = expected_queue_lengths
mock_redis = mock_redis.Redis.return_value
url = reverse("clear_celery_queues") + "?queue=priority"
response = self.client.get(url)
body = response.json()
self.assertEqual(response.status_code, 200)
self.assertIn(expected_key, body)
mock_collect.assert_called_once()
mock_redis.delete.assert_called_once()

@patch("koku.middleware.MASU", return_value=True)
@patch("masu.api.running_celery_tasks.get_celery_queue_items")
def test_celery_queue_tasks(self, mock_queue, _):
Expand Down

0 comments on commit 9b6b6f5

Please sign in to comment.