Skip to content

Commit

Permalink
Reschedule rate limited telegram task instead of retry (#5178)
Browse files Browse the repository at this point in the history
  • Loading branch information
matiasb authored Oct 15, 2024
1 parent 287bfcc commit 4667960
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 9 deletions.
35 changes: 28 additions & 7 deletions engine/apps/alerts/tasks/notify_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
from apps.user_management.models import User


RETRY_TIMEOUT_HOURS = 1


def schedule_send_bundled_notification_task(
user_notification_bundle: "UserNotificationBundle", alert_group: "AlertGroup"
):
Expand Down Expand Up @@ -445,10 +448,29 @@ def perform_notification(log_record_pk, use_default_notification_policy_fallback
try:
TelegramToUserConnector.notify_user(user, alert_group, notification_policy)
except RetryAfter as e:
countdown = getattr(e, "retry_after", 3)
raise perform_notification.retry(
(log_record_pk, use_default_notification_policy_fallback), countdown=countdown, exc=e
)
task_logger.exception(f"Telegram API rate limit exceeded. Retry after {e.retry_after} seconds.")
# check how much time has passed since log record was created
# to prevent eternal loop of restarting perform_notification task
if timezone.now() < log_record.created_at + timezone.timedelta(hours=RETRY_TIMEOUT_HOURS):
countdown = getattr(e, "retry_after", 3)
perform_notification.apply_async(
(log_record_pk, use_default_notification_policy_fallback), countdown=countdown
)
else:
task_logger.debug(
f"telegram notification for alert_group {alert_group.pk} failed because of rate limit"
)
UserNotificationPolicyLogRecord(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
notification_policy=notification_policy,
reason="Telegram rate limit exceeded",
alert_group=alert_group,
notification_step=notification_policy.step,
notification_channel=notification_channel,
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_TELEGRAM_RATELIMIT,
).save()
return

elif notification_channel == UserNotificationPolicy.NotificationChannel.SLACK:
# TODO: refactor checking the possibility of sending a notification in slack
Expand Down Expand Up @@ -516,13 +538,12 @@ def perform_notification(log_record_pk, use_default_notification_policy_fallback
).save()
return

retry_timeout_hours = 1
if alert_group.slack_message:
alert_group.slack_message.send_slack_notification(user, alert_group, notification_policy)
task_logger.debug(f"Finished send_slack_notification for alert_group {alert_group.pk}.")
# check how much time has passed since log record was created
# to prevent eternal loop of restarting perform_notification task
elif timezone.now() < log_record.created_at + timezone.timedelta(hours=retry_timeout_hours):
elif timezone.now() < log_record.created_at + timezone.timedelta(hours=RETRY_TIMEOUT_HOURS):
task_logger.debug(
f"send_slack_notification for alert_group {alert_group.pk} failed because slack message "
f"does not exist. Restarting perform_notification."
Expand All @@ -534,7 +555,7 @@ def perform_notification(log_record_pk, use_default_notification_policy_fallback
else:
task_logger.debug(
f"send_slack_notification for alert_group {alert_group.pk} failed because slack message "
f"after {retry_timeout_hours} hours still does not exist"
f"after {RETRY_TIMEOUT_HOURS} hours still does not exist"
)
UserNotificationPolicyLogRecord(
author=user,
Expand Down
20 changes: 19 additions & 1 deletion engine/apps/alerts/tests/test_notify_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,12 +360,30 @@ def test_perform_notification_telegram_retryafter_error(
countdown = 15
exc = RetryAfter(countdown)
with patch.object(TelegramToUserConnector, "notify_user", side_effect=exc) as mock_notify_user:
with pytest.raises(RetryAfter):
with patch.object(perform_notification, "apply_async") as mock_apply_async:
perform_notification(log_record.pk, False)

mock_notify_user.assert_called_once_with(user, alert_group, user_notification_policy)
# task is rescheduled using the countdown value from the exception
mock_apply_async.assert_called_once_with((log_record.pk, False), countdown=countdown)
assert alert_group.personal_log_records.last() == log_record

# but if the log was too old, skip and create a failed log record
log_record.created_at = timezone.now() - timezone.timedelta(minutes=90)
log_record.save()
with patch.object(TelegramToUserConnector, "notify_user", side_effect=exc) as mock_notify_user:
with patch.object(perform_notification, "apply_async") as mock_apply_async:
perform_notification(log_record.pk, False)
mock_notify_user.assert_called_once_with(user, alert_group, user_notification_policy)
assert not mock_apply_async.called
last_log_record = UserNotificationPolicyLogRecord.objects.last()
assert last_log_record.type == UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED
assert last_log_record.reason == "Telegram rate limit exceeded"
assert (
last_log_record.notification_error_code
== UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_TELEGRAM_RATELIMIT
)


@patch("apps.base.models.UserNotificationPolicy.get_default_fallback_policy")
@patch("apps.base.tests.messaging_backend.TestOnlyBackend.notify_user")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ class UserNotificationPolicyLogRecord(models.Model):
ERROR_NOTIFICATION_TELEGRAM_USER_IS_DEACTIVATED,
ERROR_NOTIFICATION_MOBILE_USER_HAS_NO_ACTIVE_DEVICE,
ERROR_NOTIFICATION_FORMATTING_ERROR,
) = range(29)
ERROR_NOTIFICATION_IN_TELEGRAM_RATELIMIT,
) = range(30)

# for this errors we want to send message to general log channel
ERRORS_TO_SEND_IN_SLACK_CHANNEL = [
Expand Down Expand Up @@ -304,6 +305,10 @@ def render_log_line_action(self, for_slack=False, substitute_author_with_tag=Fal
result += f"failed to notify {user_verbal} in Slack, because channel is archived"
elif self.notification_error_code == UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_SLACK_RATELIMIT:
result += f"failed to notify {user_verbal} in Slack due to Slack rate limit"
elif (
self.notification_error_code == UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_TELEGRAM_RATELIMIT
):
result += f"failed to notify {user_verbal} in Telegram due to Telegram rate limit"
elif self.notification_error_code == UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_FORBIDDEN:
result += f"failed to notify {user_verbal}, not allowed"
elif (
Expand Down

0 comments on commit 4667960

Please sign in to comment.