Skip to content

Commit

Permalink
Add support for ack/nack results so middleware can manage.
Browse files Browse the repository at this point in the history
  • Loading branch information
jenstroeger committed Aug 13, 2021
1 parent 8dd6a75 commit b1a29bb
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 12 deletions.
10 changes: 8 additions & 2 deletions dramatiq/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,21 @@ def __iter__(self): # pragma: no cover

def ack(self, message): # pragma: no cover
"""Acknowledge that a message has been processed, removing it
from the broker.
from the broker. Return True if the message was acknowledged,
False if it wasn't, or None if no action was taken (as happens
with the StubBroker). Raises ConnectionClosed if a connection
error occurred.
Parameters:
message(MessageProxy): The message to acknowledge.
"""
raise NotImplementedError

def nack(self, message): # pragma: no cover
"""Move a message to the dead-letter queue.
"""Move a message to the dead-letter queue. Return True if
the message was moved successfully, False otherwise, or None
if no action was taken (as happens with the StubBroker).
Raises ConnectionClosed if a connection error was occurred.
Parameters:
message(MessageProxy): The message to reject.
Expand Down
4 changes: 4 additions & 0 deletions dramatiq/brokers/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,25 +467,29 @@ def ack(self, message):
self.connection.add_callback_threadsafe(
partial(self.channel.basic_ack, message._tag),
)
return True
except (pika.exceptions.AMQPConnectionError,
pika.exceptions.AMQPChannelError) as e:
raise ConnectionClosed(e) from None
except KeyError:
self.logger.warning("Failed to ack message: not in known tags.")
except Exception: # pragma: no cover
self.logger.warning("Failed to ack message.", exc_info=True)
return False

def nack(self, message):
try:
self.known_tags.remove(message._tag)
self._nack(message._tag)
return True
except (pika.exceptions.AMQPConnectionError,
pika.exceptions.AMQPChannelError) as e:
raise ConnectionClosed(e) from None
except KeyError:
self.logger.warning("Failed to nack message: not in known tags.")
except Exception: # pragma: no cover
self.logger.warning("Failed to nack message.", exc_info=True)
return False

def _nack(self, tag):
self.connection.add_callback_threadsafe(
Expand Down
2 changes: 2 additions & 0 deletions dramatiq/brokers/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ def ack(self, message):
finally:
if message.message_id in self.queued_message_ids:
self.queued_message_ids.remove(message.message_id)
return True

def nack(self, message):
try:
Expand All @@ -320,6 +321,7 @@ def nack(self, message):
finally:
if message.message_id in self.queued_message_ids:
self.queued_message_ids.remove(message.message_id)
return True

def requeue(self, messages):
message_ids = [message.options["redis_message_id"] for message in messages]
Expand Down
4 changes: 2 additions & 2 deletions dramatiq/middleware/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ def before_ack(self, broker, message):
"""Called before a message is acknowledged.
"""

def after_ack(self, broker, message):
def after_ack(self, broker, message, acked=None):
"""Called after a message has been acknowledged.
"""

def before_nack(self, broker, message):
"""Called before a message is rejected.
"""

def after_nack(self, broker, message):
def after_nack(self, broker, message, nacked=None):
"""Called after a message has been rejected.
"""

Expand Down
2 changes: 1 addition & 1 deletion dramatiq/middleware/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def after_worker_shutdown(self, broker, worker):
self.logger.debug("Marking process dead...")
multiprocess.mark_process_dead(os.getpid(), DB_PATH)

def after_nack(self, broker, message):
def after_nack(self, broker, message, nacked=False):
labels = (message.queue_name, message.actor_name)
self.total_rejected_messages.labels(*labels).inc()

Expand Down
2 changes: 1 addition & 1 deletion dramatiq/results/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def after_process_message(self, broker, message, *, result=None, exception=None)
"the value has been discarded." % message.actor_name
)

def after_nack(self, broker, message):
def after_nack(self, broker, message, nacked=None):
store_results, result_ttl = self._lookup_options(broker, message)
if store_results and message.failed:
exception = message._exception or Exception("unknown")
Expand Down
8 changes: 4 additions & 4 deletions dramatiq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,14 +338,14 @@ def post_process_message(self, message):
if message.failed:
self.logger.debug("Rejecting message %r.", message.message_id)
self.broker.emit_before("nack", message)
self.consumer.nack(message)
self.broker.emit_after("nack", message)
nacked = self.consumer.nack(message)
self.broker.emit_after("nack", message, nacked=nacked)

else:
self.logger.debug("Acknowledging message %r.", message.message_id)
self.broker.emit_before("ack", message)
self.consumer.ack(message)
self.broker.emit_after("ack", message)
acked = self.consumer.ack(message)
self.broker.emit_after("ack", message, acked=acked)

return

Expand Down
4 changes: 2 additions & 2 deletions tests/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,10 @@ def test_rabbitmq_consumers_ignore_unknown_messages_in_ack_and_nack(rabbitmq_bro

# If I attempt to ack a Message that wasn't consumed off of it
# I expect nothing to happen
assert consumer.ack(Mock(_tag=1)) is None
assert consumer.ack(Mock(_tag=1)) is False

# Likewise for nack
assert consumer.nack(Mock(_tag=1)) is None
assert consumer.nack(Mock(_tag=1)) is False


def test_ignore_scary_logs_filter_ignores_logs():
Expand Down

0 comments on commit b1a29bb

Please sign in to comment.