Skip to content

Commit

Permalink
Prevent ResultSet from invoking backend.remove_pending_result (fixes c…
Browse files Browse the repository at this point in the history
…elery#4539) (celery#4540)

* Prevent ResultSet from invoking backend.remove_pending_result in favour of GroupResult

* Add's test to ensure that ResultSet.get operates correctly

* Fix flake warnings and errors in test_canvas
  • Loading branch information
donkopotamus authored and auvipy committed Apr 6, 2018
1 parent d300169 commit e3b973d
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
3 changes: 2 additions & 1 deletion CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,5 @@ Mikhail Wolfson, 2017/12/11
Alex Garel, 2018/01/04
Régis Behmo 2018/01/20
Igor Kasianov, 2018/01/20
Chris Mitchell, 2018/02/27
Derek Harland, 2018/02/15
Chris Mitchell, 2018/02/27
5 changes: 4 additions & 1 deletion celery/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,6 @@ def add(self, result):
self._on_full.add(result)

def _on_ready(self):
self.backend.remove_pending_result(self)
if self.backend.is_async:
self._cache = [r.get() for r in self.results]
self.on_ready()
Expand Down Expand Up @@ -845,6 +844,10 @@ def __init__(self, id=None, results=None, parent=None, **kwargs):
self.parent = parent
ResultSet.__init__(self, results, **kwargs)

def _on_ready(self):
self.backend.remove_pending_result(self)
ResultSet._on_ready(self)

def save(self, backend=None):
"""Save group-result for later retrieval using :meth:`restore`.
Expand Down
12 changes: 11 additions & 1 deletion t/integration/test_canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from celery import chain, chord, group
from celery.exceptions import TimeoutError
from celery.result import AsyncResult, GroupResult
from celery.result import AsyncResult, GroupResult, ResultSet

from .conftest import flaky, get_redis_connection
from .tasks import (add, add_chord_to_chord, add_replaced, add_to_all,
Expand Down Expand Up @@ -186,6 +186,16 @@ def test_chain_error_handler_with_eta(self, manager):
assert result == 10


class test_result_set:

@flaky
def test_result_set(self, manager):
assert manager.inspect().ping()

rs = ResultSet([add.delay(1, 1), add.delay(2, 2)])
assert rs.get(timeout=TIMEOUT) == [2, 4]


class test_group:

@flaky
Expand Down

0 comments on commit e3b973d

Please sign in to comment.