Skip to content

Commit

Permalink
refactor(celery): deprecate poll_task (#393)
Browse files Browse the repository at this point in the history
  • Loading branch information
ocervell authored May 6, 2024
1 parent 7d74ae8 commit feacf55
Showing 1 changed file with 4 additions and 52 deletions.
56 changes: 4 additions & 52 deletions secator/celery.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import gc
import logging
import traceback
from time import sleep

from celery import Celery, chain, chord, signals
from celery.app import trace
from celery.result import AsyncResult, allow_join_result
from celery.result import allow_join_result
# from pyinstrument import Profiler # TODO: make pyinstrument optional
from rich.logging import RichHandler

Expand Down Expand Up @@ -354,56 +353,9 @@ def forward_results(results):
results = deduplicate(results, attr='_uuid')
return results


#---------------------#
# Celery result utils #
#---------------------#


def poll_task(result, seen=[]):
"""Poll Celery result tree recursively to get results live.
TODO: function is incomplete, as it does not parse all results.
Args:
result (Union[AsyncResult, GroupResult]): Celery result object.
seen (list): List of seen results (do not yield again).
Yields:
dict: Result.
"""
if result is None:
return

if result.children:
for child in result.children:
yield from poll_task(child, seen=seen)
else:
res = AsyncResult(result.id)
if not res.info:
sleep(0.1)
yield from poll_task(result, seen=seen)

# Task done running
if isinstance(res.info, list):
for item in res.info:
if item._uuid not in seen:
yield res.id, None, item
seen.append(item._uuid)
return

# Get task partial results, remove duplicates
results = res.info['results']
name = res.info['name']
for item in results:
if item._uuid not in seen:
yield res.id, name, item
seen.append(item._uuid)

# Task still running, keep polling
if not res.ready():
sleep(0.1)
yield from poll_task(result, seen=seen)
#--------------#
# Celery utils #
#--------------#


def is_celery_worker_alive():
Expand Down

0 comments on commit feacf55

Please sign in to comment.