From 3b330d975dc84e236a3539b75a21f5c1f9b22b5f Mon Sep 17 00:00:00 2001 From: Omer Katz Date: Tue, 5 Dec 2017 15:18:24 +0200 Subject: [PATCH] Added a test to verify that second order replace works as expected. Fixes #3116. --- t/integration/tasks.py | 30 +++++++++++++++++++++++++++++- t/integration/test_canvas.py | 23 ++++++++++++++++++++++- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/t/integration/tasks.py b/t/integration/tasks.py index bf3a4083ab8..9c46d3c2ffb 100644 --- a/t/integration/tasks.py +++ b/t/integration/tasks.py @@ -3,7 +3,7 @@ from time import sleep -from celery import group, shared_task +from celery import chain, group, shared_task from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @@ -74,3 +74,31 @@ def redis_echo(message): redis_connection = StrictRedis() redis_connection.rpush('redis-echo', message) + + +@shared_task(bind=True) +def second_order_replace1(self, state=False): + from redis import StrictRedis + + redis_connection = StrictRedis() + if not state: + redis_connection.rpush('redis-echo', 'In A') + new_task = chain(second_order_replace2.s(), + second_order_replace1.si(state=True)) + raise self.replace(new_task) + else: + redis_connection.rpush('redis-echo', 'Out A') + + +@shared_task(bind=True) +def second_order_replace2(self, state=False): + from redis import StrictRedis + + redis_connection = StrictRedis() + if not state: + redis_connection.rpush('redis-echo', 'In B') + new_task = chain(redis_echo.s("In/Out C"), + second_order_replace2.si(state=True)) + raise self.replace(new_task) + else: + redis_connection.rpush('redis-echo', 'Out B') diff --git a/t/integration/test_canvas.py b/t/integration/test_canvas.py index b82f0993403..02e126f0975 100644 --- a/t/integration/test_canvas.py +++ b/t/integration/test_canvas.py @@ -8,7 +8,8 @@ from celery.result import AsyncResult, GroupResult from .conftest import flaky -from .tasks import add, add_replaced, add_to_all, collect_ids, ids, redis_echo +from .tasks import (add, add_replaced, add_to_all, collect_ids, ids, + redis_echo, second_order_replace1) TIMEOUT = 120 @@ -58,6 +59,26 @@ def test_group_chord_group_chain(self, manager): assert set(redis_messages[4:]) == after_items redis_connection.delete('redis-echo') + @flaky + def test_second_order_replace(self, manager): + from celery.five import bytes_if_py2 + + if not manager.app.conf.result_backend.startswith('redis'): + raise pytest.skip('Requires redis result backend.') + + redis_connection = StrictRedis() + redis_connection.delete('redis-echo') + + result = second_order_replace1.delay() + result.get(timeout=TIMEOUT) + redis_messages = list(map( + bytes_if_py2, + redis_connection.lrange('redis-echo', 0, -1) + )) + + expected_messages = [b'In A', b'In B', b'In/Out C', b'Out B', b'Out A'] + assert redis_messages == expected_messages + @flaky def test_parent_ids(self, manager, num=10): assert manager.inspect().ping()