diff --git a/secator/celery.py b/secator/celery.py index 6f3f0e95..8ae28afb 100644 --- a/secator/celery.py +++ b/secator/celery.py @@ -1,11 +1,10 @@ import gc import logging import traceback -from time import sleep from celery import Celery, chain, chord, signals from celery.app import trace -from celery.result import AsyncResult, allow_join_result +from celery.result import allow_join_result # from pyinstrument import Profiler # TODO: make pyinstrument optional from rich.logging import RichHandler @@ -354,56 +353,9 @@ def forward_results(results): results = deduplicate(results, attr='_uuid') return results - -#---------------------# -# Celery result utils # -#---------------------# - - -def poll_task(result, seen=[]): - """Poll Celery result tree recursively to get results live. - - TODO: function is incomplete, as it does not parse all results. - - Args: - result (Union[AsyncResult, GroupResult]): Celery result object. - seen (list): List of seen results (do not yield again). - - Yields: - dict: Result. - """ - if result is None: - return - - if result.children: - for child in result.children: - yield from poll_task(child, seen=seen) - else: - res = AsyncResult(result.id) - if not res.info: - sleep(0.1) - yield from poll_task(result, seen=seen) - - # Task done running - if isinstance(res.info, list): - for item in res.info: - if item._uuid not in seen: - yield res.id, None, item - seen.append(item._uuid) - return - - # Get task partial results, remove duplicates - results = res.info['results'] - name = res.info['name'] - for item in results: - if item._uuid not in seen: - yield res.id, name, item - seen.append(item._uuid) - - # Task still running, keep polling - if not res.ready(): - sleep(0.1) - yield from poll_task(result, seen=seen) +#--------------# +# Celery utils # +#--------------# def is_celery_worker_alive():