Skip to content

Commit

Permalink
fix(consumer): Ensure backpressure is properly handled in stale messa…
Browse files Browse the repository at this point in the history
…ge routing (#83271)

Previously we were not handling the case where a `MessageRejected` error
which signals backpressure from downstream steps might be received in
the dlq stale messages strategy. If this happens we can just ignore the
error, we will retry on the next loop.
  • Loading branch information
lynnagara authored Jan 11, 2025
1 parent f8f3a8a commit 5f9a15d
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions src/sentry/consumers/dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@

from arroyo.backends.kafka import KafkaPayload, KafkaProducer
from arroyo.dlq import InvalidMessage, KafkaDlqProducer
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.processing.strategies.abstract import (
MessageRejected,
ProcessingStrategy,
ProcessingStrategyFactory,
)
from arroyo.types import FILTERED_PAYLOAD, BrokerValue, Commit, FilteredPayload, Message, Partition
from arroyo.types import Topic as ArroyoTopic
from arroyo.types import Value
Expand Down Expand Up @@ -127,9 +131,12 @@ def poll(self) -> None:
if self.offsets_to_forward:
if time.time() > self.last_forwarded_offsets + 1:
filtered_message = Message(Value(FILTERED_PAYLOAD, self.offsets_to_forward))
self.next_step.submit(filtered_message)
self.offsets_to_forward = {}
self.last_forwarded_offsets = time.time()
try:
self.next_step.submit(filtered_message)
self.offsets_to_forward = {}
self.last_forwarded_offsets = time.time()
except MessageRejected:
pass

def join(self, timeout: float | None = None) -> None:
self.next_step.join(timeout)
Expand Down

0 comments on commit 5f9a15d

Please sign in to comment.