diff --git a/src/sentry/consumers/dlq.py b/src/sentry/consumers/dlq.py index 4e8ea5f7939d25..eb3e7e1fec89b8 100644 --- a/src/sentry/consumers/dlq.py +++ b/src/sentry/consumers/dlq.py @@ -7,7 +7,11 @@ from arroyo.backends.kafka import KafkaPayload, KafkaProducer from arroyo.dlq import InvalidMessage, KafkaDlqProducer -from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory +from arroyo.processing.strategies.abstract import ( + MessageRejected, + ProcessingStrategy, + ProcessingStrategyFactory, +) from arroyo.types import FILTERED_PAYLOAD, BrokerValue, Commit, FilteredPayload, Message, Partition from arroyo.types import Topic as ArroyoTopic from arroyo.types import Value @@ -127,9 +131,12 @@ def poll(self) -> None: if self.offsets_to_forward: if time.time() > self.last_forwarded_offsets + 1: filtered_message = Message(Value(FILTERED_PAYLOAD, self.offsets_to_forward)) - self.next_step.submit(filtered_message) - self.offsets_to_forward = {} - self.last_forwarded_offsets = time.time() + try: + self.next_step.submit(filtered_message) + self.offsets_to_forward = {} + self.last_forwarded_offsets = time.time() + except MessageRejected: + pass def join(self, timeout: float | None = None) -> None: self.next_step.join(timeout)