From e3b973d90dd445e0f6299dbd82c50d388aa2e841 Mon Sep 17 00:00:00 2001 From: Derek Harland Date: Fri, 6 Apr 2018 16:29:14 +1200 Subject: [PATCH] Prevent ResultSet from invoking backend.remove_pending_result (fixes #4539) (#4540) * Prevent ResultSet from invoking backend.remove_pending_result in favour of GroupResult * Add's test to ensure that ResultSet.get operates correctly * Fix flake warnings and errors in test_canvas --- CONTRIBUTORS.txt | 3 ++- celery/result.py | 5 ++++- t/integration/test_canvas.py | 12 +++++++++++- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index 92144df83ba..52781833a99 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -257,4 +257,5 @@ Mikhail Wolfson, 2017/12/11 Alex Garel, 2018/01/04 RĂ©gis Behmo 2018/01/20 Igor Kasianov, 2018/01/20 -Chris Mitchell, 2018/02/27 +Derek Harland, 2018/02/15 +Chris Mitchell, 2018/02/27 \ No newline at end of file diff --git a/celery/result.py b/celery/result.py index 6c9db30ea23..874f3afddcd 100644 --- a/celery/result.py +++ b/celery/result.py @@ -487,7 +487,6 @@ def add(self, result): self._on_full.add(result) def _on_ready(self): - self.backend.remove_pending_result(self) if self.backend.is_async: self._cache = [r.get() for r in self.results] self.on_ready() @@ -845,6 +844,10 @@ def __init__(self, id=None, results=None, parent=None, **kwargs): self.parent = parent ResultSet.__init__(self, results, **kwargs) + def _on_ready(self): + self.backend.remove_pending_result(self) + ResultSet._on_ready(self) + def save(self, backend=None): """Save group-result for later retrieval using :meth:`restore`. diff --git a/t/integration/test_canvas.py b/t/integration/test_canvas.py index ea8d522a670..c43d313749f 100644 --- a/t/integration/test_canvas.py +++ b/t/integration/test_canvas.py @@ -7,7 +7,7 @@ from celery import chain, chord, group from celery.exceptions import TimeoutError -from celery.result import AsyncResult, GroupResult +from celery.result import AsyncResult, GroupResult, ResultSet from .conftest import flaky, get_redis_connection from .tasks import (add, add_chord_to_chord, add_replaced, add_to_all, @@ -186,6 +186,16 @@ def test_chain_error_handler_with_eta(self, manager): assert result == 10 +class test_result_set: + + @flaky + def test_result_set(self, manager): + assert manager.inspect().ping() + + rs = ResultSet([add.delay(1, 1), add.delay(2, 2)]) + assert rs.get(timeout=TIMEOUT) == [2, 4] + + class test_group: @flaky