Skip to content

Commit

Permalink
fix: allow processing messages from unknown queues
Browse files Browse the repository at this point in the history
  • Loading branch information
davidt99 committed Dec 5, 2024
1 parent a11a459 commit 848a408
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 3 deletions.
13 changes: 10 additions & 3 deletions dramatiq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def stop(self, timeout=600000):
self.logger.debug("Requeueing in-memory messages...")
messages_by_queue = defaultdict(list)
for _, message in iter_queue(self.work_queue):
messages_by_queue[message.queue_name].append(message)
messages_by_queue[self.broker.get_actor(message.actor_name).queue_name].append(message)

for queue_name, messages in messages_by_queue.items():
try:
Expand Down Expand Up @@ -316,6 +316,8 @@ def handle_message(self, message):
If the message has an eta, delay it. Otherwise, put it on the
work queue.
"""
if q_name(message.queue_name) != q_name(self.queue_name):
self.logger.warning("Received message for queue %r but message queue is %r.", self.queue_name, message.queue_name)
try:
if "eta" in message.options:
self.logger.debug("Pushing message %r onto delay queue.", message.message_id)
Expand Down Expand Up @@ -478,12 +480,12 @@ def process_message(self, message):
"""
actor = None
try:
actor = self.broker.get_actor(message.actor_name)
self.logger.debug("Received message %s with id %r.", message, message.message_id)
self.broker.emit_before("process_message", message)

res = None
if not message.failed:
actor = self.broker.get_actor(message.actor_name)
res = actor(*message.args, **message.kwargs)
if res is not None \
and message.options.get("pipe_target") is None \
Expand Down Expand Up @@ -521,7 +523,12 @@ def process_message(self, message):
# processed must have come off of a consumer. Therefore,
# there has to be a consumer for that message's queue so
# this is safe. Probably.
self.consumers[message.queue_name].post_process_message(message)
queue_name = actor.queue_name if actor else message.queue_name
if queue_name in self.consumers:
self.consumers[queue_name].post_process_message(message)
else:
self.logger.error("No consumer for queue %r found.", queue_name)

self.work_queue.task_done()

# See discussion #351. Keeping a reference to the
Expand Down
55 changes: 55 additions & 0 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import dramatiq
from dramatiq.middleware import middleware
from .common import worker


Expand All @@ -11,3 +13,56 @@ def test_workers_dont_register_queues_that_arent_whitelisted(stub_broker):
# Then a consumer should not get spun up for that queue
assert "c" not in stub_worker.consumers
assert "c.DQ" not in stub_worker.consumers


def test_worker_can_process_messages_from_unknown_queues(stub_broker):
results = []

# Given that I have a broker with a default queue
stub_broker.declare_queue("default")

# And an actor
@dramatiq.actor
def do_work():
results.append(42)

message = do_work.message()
message = message.copy(queue_name="unknown")

# And I enqueue a message for an unknown queue onto the broker
stub_broker.queues["default"].put(message.encode())

# Given that I have a worker
with worker(stub_broker, queues={"default"}) as stub_worker:
# Then the worker should be able to process the message based on the actor name
stub_worker.join()
assert results == [42]


def test_worker_can_process_failed_messages_from_unknown_queues(stub_broker):
# Given that I have a broker with a default queue
stub_broker.declare_queue("default")

# And a middleware that fails all messages
class FailMiddleware(middleware.Middleware):
def before_process_message(self, broker, message):
message.fail()

stub_broker.add_middleware(FailMiddleware())

# And an actor
@dramatiq.actor
def do_work():
pass

some_message = do_work.message()
some_message = some_message.copy(queue_name="unknown")

# And I enqueue a message for an unknown queue onto the broker
stub_broker.queues["default"].put(some_message.encode())

# Given that I have a worker
with worker(stub_broker, queues={"default"}) as stub_worker:
# Then the worker should be able to process the message based on the actor name
stub_worker.join()
assert 1 == len(stub_broker.dead_letters_by_queue["default"])

0 comments on commit 848a408

Please sign in to comment.