Skip to content

Commit

Permalink
explicitly drain events when gossip/heartbeat will not - fix for cele…
Browse files Browse the repository at this point in the history
…ry#1847

kudos for @sabw8217 who's fix i copied verbatim.
  • Loading branch information
bryanhelmig authored and ask committed Sep 25, 2015
1 parent b1e628e commit 0308ce6
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 0 deletions.
4 changes: 4 additions & 0 deletions celery/tests/worker/test_loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ def add(x, y):
return x + y
self.add = add

def test_drain_after_consume(self):
x, _ = get_task_callback(self.app)
x.connection.drain_events.assert_called_with()

def test_setup_heartbeat(self):
x = X(self.app, heartbeat=10)
x.hub.call_repeatedly = Mock(name='x.hub.call_repeatedly()')
Expand Down
5 changes: 5 additions & 0 deletions celery/worker/loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
if not obj.restart_count and not obj.pool.did_start_ok():
raise WorkerLostError('Could not start worker processes')

# consumer.consume() may have prefetched up to our
# limit - drain an event so we are in a clean state
# prior to starting our event loop.
connection.drain_events()

# FIXME: Use loop.run_forever
# Tried and works, but no time to test properly before release.
hub.propagate_errors = errors
Expand Down

0 comments on commit 0308ce6

Please sign in to comment.