Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support task-failed event for batch tasks #83

Open
Anghille opened this issue Oct 2, 2023 · 9 comments
Open

Support task-failed event for batch tasks #83

Anghille opened this issue Oct 2, 2023 · 9 comments
Labels
enhancement New feature or request

Comments

@Anghille
Copy link

Anghille commented Oct 2, 2023

I open this issue to discuss / work on the task-failed events. For information, using events enables us (developpers) to monitor our celery batch apps using tools such as prometheus (in combination with a CeleryExporter for exemple).

Therefore having most (if not all) events used by celery would be a huge benefit.

@Anghille
Copy link
Author

Anghille commented Oct 2, 2023

For now, I tried to add a custom function callback to the flush() method :

def flush(...):
    # ....
    def on_failure(reqest: Optional[Any]) -> None:
        for request in requests:
            request.send_event("task-failed")
    # ...

    return self._pool.apply_async(
            apply_batches_task,
            (self, serializable_requests, 0, None),
            accept_callback=on_accepted,
            callback=on_return,
            on_failure=on_failure,
     )

And added a receiverson trace.py:

# ...
finally:
    try:
        if postrun_receivers:
            send_postrun(
                sender=task,
                task_id=task_id,
                task=task,
                args=args,
                kwargs={},
                retval=result,
                state=state,
            )
         elif signals.task_failed.receivers:
             signals.task_fail.send(
                      sender=task, 
                      task_id=task_id,
                      result=None, 
                      args=args,
                      kwargs={},
                      state=states.FAILURE,
                      exc=TypeError('Exception'))
        finally:
            pop_task()
            pop_request()

In the pytests, I added a new failure() celery task, which return mark_as_failed for the backend. Right now, task state is still in PENDING state when I try to assert result_1.state==states.FAILURE, and obviously assert_calls() fails as well.

@clokep
Copy link
Owner

clokep commented Oct 2, 2023

See also #33, #70.

Overall it sounds like this approach is fairly reasonable? Sounds like you're just running into an issue with the tests? Would you mind putting up a PR for further discussion?

@clokep clokep added the enhancement New feature or request label Oct 2, 2023
@clokep
Copy link
Owner

clokep commented Oct 2, 2023

Would you mind putting up a PR for further discussion?

Oh, looks like this is #82. 🎉

@Anghille
Copy link
Author

Anghille commented Oct 4, 2023

I might have found why task-failed are not sent:

In the task.py, if the task is failed such as in this custom task I created for the unit tests:

# In t/integration/tasks.py
@shared_task(base=Batches, flush_every=2, flush_interval=0.1)
def failed_add(requests: List[SimpleRequest]) -> int:
    """
    Add the first argument of each task.

    Marks the result of each task as the sum.
    """
    from celery import current_app
    for request in requests:
        current_app.backend.mark_as_failure(request.id, exc=Exception())
        raise Exception()

# In test_batches.py
def test_failures(celery_app: Celery, celery_worker: TestWorkController) -> None:
    """Ensure that Celery signals run for the batch task."""
    # Configure a SignalCounter for each task signal.
    checks = (
        (signals.task_failure, 1),
    )
    signal_counters = []
    for sig, expected_count in checks:
        counter = SignalCounter(expected_count)
        sig.connect(counter)
        signal_counters.append(counter)

    # The batch runs after 2 task calls.
    result_1 = failed_add.delay(1)

    # Let the worker work.
    _wait_for_ping()

    for _ in range(10):  # Try up to 10 times with a short delay between attempts
        if result_1.state == states.FAILURE:
            break
        sleep(0.1)
    else:
        pytest.fail(f"Task state is {result_1.state}, expected {states.FAILURE}")

    for counter in signal_counters:
        counter.assert_calls()

The send-postrun is never send because the postrun receiver doesn't exists. It seems that a custom send_failed() signal must be executed in the exception itself:

"""Trace task execution.

This module defines how the task execution is traced:
errors are recorded, handlers are applied and so on.

Mimics some of the functionality found in celery.app.trace.trace_task.
"""
from typing import TYPE_CHECKING, Any, List, Tuple

from celery import signals, states
from celery._state import _task_stack
from celery.app.task import Context
from celery.utils.log import get_logger
from kombu.utils.uuid import uuid
import logging
if TYPE_CHECKING:
    from celery_batches import Batches, SimpleRequest

logger = get_logger(__name__)

send_prerun = signals.task_prerun.send
send_postrun = signals.task_postrun.send
send_success = signals.task_success.send
SUCCESS = states.SUCCESS
FAILURE = states.FAILURE


def apply_batches_task(
    task: "Batches", args: Tuple[List["SimpleRequest"]], loglevel: int, logfile: None
) -> Any:
    request_stack = task.request_stack
    push_request = request_stack.push
    pop_request = request_stack.pop
    push_task = _task_stack.push
    pop_task = _task_stack.pop

    prerun_receivers = signals.task_prerun.receivers
    postrun_receivers = signals.task_postrun.receivers
    success_receivers = signals.task_success.receivers

    # Corresponds to multiple requests, so generate a new UUID.
    task_id = uuid()

    push_task(task)
    task_request = Context(loglevel=loglevel, logfile=logfile)
    push_request(task_request)

    try:
        # -*- PRE -*-
        if prerun_receivers:
            send_prerun(sender=task, task_id=task_id, task=task, args=args, kwargs={})

        # -*- TRACE -*-
        try:
            result = task(*args)
            state = SUCCESS
        except Exception as exc:
            result = None
            state = FAILURE
            logger.error("Error: %r", exc, exc_info=True)
            signals.task_failure.send( # <=== This is added to send the FAILURE signal because postrun is never true
                sender=task,
                task_id=task_id,
                state=state,
                task=task,
                args=args,
                kwargs={}
            )
        else:
            if success_receivers:
                send_success(sender=task, result=result)
    finally:
        try:
            if postrun_receivers: # <==== THIS IS NEVER TRUE WHEN AN EXCEPTION OCCURS.
                send_postrun( # <== The state FAILURE is therefore never sent 
                    sender=task,
                    task_id=task_id,
                    task=task,
                    args=args,
                    kwargs={},
                    retval=result,
                    state=state,
                )
        finally:
            pop_task()
            pop_request()

    return result

What do you think about it?

@clokep
Copy link
Owner

clokep commented Oct 4, 2023

It is very difficult to track the changes being made this way -- can you please push a branch so that we can view the diff easily?

@Anghille
Copy link
Author

Anghille commented Oct 5, 2023

Sure thing

@Anghille
Copy link
Author

Anghille commented Oct 6, 2023

I have made a new branch on the forked repository here if you want to check it out: https://github.com/Anghille/celery-batches/tree/failed

@Anghille
Copy link
Author

Do you have any idea How events could be implemented?
Not signals, but the events themselves. I tried to add EventDispatcher() or other kind of event senders but I cannot get those events what ever I try to do :(

@clokep
Copy link
Owner

clokep commented Oct 11, 2023

Do you have any idea How events could be implemented?

I think you can just call task.on_foo(...) and it should do the proper logic, we essentially need to re-implement part of execute_using_pool (flush is essentially celery-batch's version of that).

So I think calling req.on_failure for each Request is what is needed.


Note that I would really recommend finishing up the stuff that works first. I.e. making #82 just about the task_received signal.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants