Skip to content

Commit

Permalink
Added a test to verify that second order replace works as expected. F…
Browse files Browse the repository at this point in the history
…ixes celery#3116.
  • Loading branch information
Omer Katz committed Dec 5, 2017
1 parent d02d260 commit 3b330d9
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 2 deletions.
30 changes: 29 additions & 1 deletion t/integration/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from time import sleep

from celery import group, shared_task
from celery import chain, group, shared_task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)
Expand Down Expand Up @@ -74,3 +74,31 @@ def redis_echo(message):

redis_connection = StrictRedis()
redis_connection.rpush('redis-echo', message)


@shared_task(bind=True)
def second_order_replace1(self, state=False):
from redis import StrictRedis

redis_connection = StrictRedis()
if not state:
redis_connection.rpush('redis-echo', 'In A')
new_task = chain(second_order_replace2.s(),
second_order_replace1.si(state=True))
raise self.replace(new_task)
else:
redis_connection.rpush('redis-echo', 'Out A')


@shared_task(bind=True)
def second_order_replace2(self, state=False):
from redis import StrictRedis

redis_connection = StrictRedis()
if not state:
redis_connection.rpush('redis-echo', 'In B')
new_task = chain(redis_echo.s("In/Out C"),
second_order_replace2.si(state=True))
raise self.replace(new_task)
else:
redis_connection.rpush('redis-echo', 'Out B')
23 changes: 22 additions & 1 deletion t/integration/test_canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from celery.result import AsyncResult, GroupResult

from .conftest import flaky
from .tasks import add, add_replaced, add_to_all, collect_ids, ids, redis_echo
from .tasks import (add, add_replaced, add_to_all, collect_ids, ids,
redis_echo, second_order_replace1)

TIMEOUT = 120

Expand Down Expand Up @@ -58,6 +59,26 @@ def test_group_chord_group_chain(self, manager):
assert set(redis_messages[4:]) == after_items
redis_connection.delete('redis-echo')

@flaky
def test_second_order_replace(self, manager):
from celery.five import bytes_if_py2

if not manager.app.conf.result_backend.startswith('redis'):
raise pytest.skip('Requires redis result backend.')

redis_connection = StrictRedis()
redis_connection.delete('redis-echo')

result = second_order_replace1.delay()
result.get(timeout=TIMEOUT)
redis_messages = list(map(
bytes_if_py2,
redis_connection.lrange('redis-echo', 0, -1)
))

expected_messages = [b'In A', b'In B', b'In/Out C', b'Out B', b'Out A']
assert redis_messages == expected_messages

@flaky
def test_parent_ids(self, manager, num=10):
assert manager.inspect().ping()
Expand Down

0 comments on commit 3b330d9

Please sign in to comment.