Skip to content

Commit

Permalink
Revert "Remove while loop from check_service"
Browse files Browse the repository at this point in the history
This reverts commit 46d2863.
  • Loading branch information
Zlopez committed Sep 25, 2024
1 parent 052e630 commit 5cabe6d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 21 deletions.
42 changes: 25 additions & 17 deletions anitya/check_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"""

import logging
from concurrent.futures import ThreadPoolExecutor, wait
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from threading import Lock
from time import sleep
Expand Down Expand Up @@ -189,6 +189,8 @@ def run(self):
self.clear_counters()
queue = self.construct_queue(time)
total_count = len(queue)
projects_left = len(queue)
projects_iter = iter(queue)

if not queue:
return
Expand All @@ -199,26 +201,32 @@ def run(self):
futures = {}
pool_size = config.get("CRON_POOL")
timeout = config.get("CHECK_TIMEOUT")
for i in range(0, len(queue), pool_size):
with ThreadPoolExecutor(pool_size) as pool:
# Wait till every project in chunk is checked
for project in queue[i : i + pool_size]:
with ThreadPoolExecutor(pool_size) as pool:
# Wait till every project in queue is checked
while projects_left:
for project in projects_iter:
future = pool.submit(self.update_project, project)
futures[future] = project
if len(futures) > pool_size:
break # limit job submissions

# Wait for jobs that aren't completed yet
(done, not_done) = wait(futures, timeout=timeout)
for future in done:
# log any exception
if future.exception():
try:
future.result()
except Exception as e:
_log.exception(e)
for future in not_done:
future.cancel()
# Don't actually kill the threads, they will be killed by finishing
# the with statement
try:
for future in as_completed(futures, timeout=timeout):
projects_left -= 1 # one project down

# log any exception
if future.exception():
try:
future.result()
except Exception as e:
_log.exception(e)

del futures[future]

break # give a chance to add more jobs
except TimeoutError:
projects_left -= 1
_log.info("Thread was killed because the execution took too long.")
with self.error_counter_lock:
self.error_counter += 1
Expand Down
5 changes: 2 additions & 3 deletions anitya/tests/test_check_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
"""

import unittest
from concurrent.futures import Future
from datetime import timedelta
from unittest import mock

Expand Down Expand Up @@ -383,8 +382,8 @@ def test_run_small_pool_size(self, mock_check_project_release):
self.assertEqual(len(run_objects), 1)
self.assertEqual(run_objects[0].total_count, 2)

@mock.patch("anitya.check_service.wait", return_value=([], [Future(), Future()]))
def test_run_timeout(self, mock_wait):
@mock.patch("anitya.check_service.as_completed", side_effect=TimeoutError())
def test_run_timeout(self, mock_as_completed):
"""
Assert that TimeoutError is thrown when TIMEOUT is reached.
"""
Expand Down
1 change: 0 additions & 1 deletion news/1806.dev

This file was deleted.

0 comments on commit 5cabe6d

Please sign in to comment.