Skip to content

Commit

Permalink
fix: allow processing messages from unknown queues
Browse files Browse the repository at this point in the history
  • Loading branch information
davidt99 committed Dec 4, 2024
1 parent a11a459 commit 110dc40
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
14 changes: 13 additions & 1 deletion dramatiq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ def handle_message(self, message):
If the message has an eta, delay it. Otherwise, put it on the
work queue.
"""
if q_name(message.queue_name) != q_name(self.queue_name):
self.logger.warning("Received message for queue %r but message queue is %r.", self.queue_name, message.queue_name)
try:
if "eta" in message.options:
self.logger.debug("Pushing message %r onto delay queue.", message.message_id)
Expand Down Expand Up @@ -517,11 +519,21 @@ def process_message(self, message):
self.broker.emit_after("process_message", message, exception=e)

finally:
if not actor:
try:
actor = self.broker.get_actor(message.actor_name)
queue_name = actor.queue_name
except ActorNotFound:
# This shouldn't happen
queue_name = message.queue_name
else:
queue_name = actor.queue_name

# NOTE: There is no race here as any message that was
# processed must have come off of a consumer. Therefore,
# there has to be a consumer for that message's queue so
# this is safe. Probably.
self.consumers[message.queue_name].post_process_message(message)
self.consumers[queue_name].post_process_message(message)
self.work_queue.task_done()

# See discussion #351. Keeping a reference to the
Expand Down
53 changes: 53 additions & 0 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import dramatiq
from dramatiq.middleware import middleware
from .common import worker


Expand All @@ -11,3 +13,54 @@ def test_workers_dont_register_queues_that_arent_whitelisted(stub_broker):
# Then a consumer should not get spun up for that queue
assert "c" not in stub_worker.consumers
assert "c.DQ" not in stub_worker.consumers

def test_worker_can_process_messages_from_unknown_queues(stub_broker):
results = []

# Given that I have a broker with a default queue
stub_broker.declare_queue("default")

# And an actor
@dramatiq.actor
def do_work():
results.append(42)

message = do_work.message()
message = message.copy(queue_name="unknown")

# And I enqueue a message for an unknown queue onto the broker
stub_broker.queues["default"].put(message.encode())

# Given that I have a worker
with worker(stub_broker, queues={"default"}) as stub_worker:
# Then the worker should be able to process the message based on the actor name
stub_worker.join()
assert results == [42]

def test_worker_can_process_failed_messages_from_unknown_queues(stub_broker):
# Given that I have a broker with a default queue
stub_broker.declare_queue("default")

# And a middleware that fails all messages
class FailMiddleware(middleware.Middleware):
def before_process_message(self, broker, message):
message.fail()

stub_broker.add_middleware(FailMiddleware())

# And an actor
@dramatiq.actor
def do_work():
pass

some_message = do_work.message()
some_message = some_message.copy(queue_name="unknown")

# And I enqueue a message for an unknown queue onto the broker
stub_broker.queues["default"].put(some_message.encode())

# Given that I have a worker
with worker(stub_broker, queues={"default"}) as stub_worker:
# Then the worker should be able to process the message based on the actor name
stub_worker.join()
assert 1 == len(stub_broker.dead_letters_by_queue["default"])

0 comments on commit 110dc40

Please sign in to comment.