From 8420cfd8223073d936c2b1109503d71680bbe63b Mon Sep 17 00:00:00 2001 From: Yulya Artyukhina Date: Wed, 16 Oct 2024 14:13:28 +0200 Subject: [PATCH 1/3] Fix acknowledge reminder task (#5179) # What this PR does - Adds 10 minutes lock for acknowledge reminder task to prevent task duplicates, that causes posting multiple reminder messages and flooding in Slack threads. - Adds a new signal for acknowledge reminder task instead of using `alert_group_action_triggered_signal` since it is used only to post reminder message in Slack thread and it's not needed to be processed by other representatives ## Which issue(s) this PR closes Related to https://github.com/grafana/oncall-private/issues/2953 ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] Added the relevant release notes label (see labels prefixed w/ `release:`). These labels dictate how your PR will show up in the autogenerated release notes. --- engine/apps/alerts/signals.py | 7 ++ .../apps/alerts/tasks/acknowledge_reminder.py | 34 ++++++++-- .../alerts/tests/test_acknowledge_reminder.py | 57 ++++++++++++++-- .../alert_group_representative.py | 67 +++++++++++++------ .../telegram/alert_group_representative.py | 1 - engine/settings/celery_task_routes.py | 1 + 6 files changed, 138 insertions(+), 29 deletions(-) diff --git a/engine/apps/alerts/signals.py b/engine/apps/alerts/signals.py index 3de510280c..648e3701f2 100644 --- a/engine/apps/alerts/signals.py +++ b/engine/apps/alerts/signals.py @@ -24,6 +24,9 @@ # Signal to rerender alert group's resolution note in all connected integrations (Slack) alert_group_update_resolution_note_signal = django.dispatch.Signal() +# Signal to post acknowledge reminder message (Slack) +post_ack_reminder_message_signal = django.dispatch.Signal() + # Currently only writes error in Slack thread while notify user. Maybe it is worth to delete it? user_notification_action_triggered_signal = django.dispatch.Signal() @@ -40,6 +43,10 @@ AlertGroupSlackRepresentative.on_alert_group_update_resolution_note, ) +post_ack_reminder_message_signal.connect( + AlertGroupSlackRepresentative.on_alert_group_post_acknowledge_reminder_message, +) + user_notification_action_triggered_signal.connect( UserSlackRepresentative.on_user_action_triggered, ) diff --git a/engine/apps/alerts/tasks/acknowledge_reminder.py b/engine/apps/alerts/tasks/acknowledge_reminder.py index 73d864915d..ebbacc6392 100644 --- a/engine/apps/alerts/tasks/acknowledge_reminder.py +++ b/engine/apps/alerts/tasks/acknowledge_reminder.py @@ -2,15 +2,25 @@ from functools import partial from django.conf import settings +from django.core.cache import cache from django.db import transaction from django.utils import timezone +from apps.alerts.signals import post_ack_reminder_message_signal from common.custom_celery_tasks import shared_dedicated_queue_retry_task from .send_alert_group_signal import send_alert_group_signal from .task_logger import task_logger -MAX_RETRIES = 1 if settings.DEBUG else None +MAX_RETRIES = 1 if settings.DEBUG else 10 + + +def is_allowed_to_send_acknowledge_reminder(alert_group_id, process_id): + lock_id = f"acknowledge-reminder-lock-{alert_group_id}" + lock_period = 60 * 10 # 10 min + # cache.add returns False if the key already exists + status = cache.add(lock_id, process_id, lock_period) + return status @shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES) @@ -28,6 +38,11 @@ def acknowledge_reminder_task(alert_group_pk: int, unacknowledge_process_id: str if unacknowledge_process_id != alert_group.last_unique_unacknowledge_process_id: return + # Don't proceed if acknowledge reminder for this alert group has already been sent recently + if not is_allowed_to_send_acknowledge_reminder(alert_group.id, unacknowledge_process_id): + task_logger.info(f"Acknowledge reminder for alert_group {alert_group_pk} has already been sent recently.") + return + organization = alert_group.channel.organization # Get timeout values @@ -55,8 +70,9 @@ def acknowledge_reminder_task(alert_group_pk: int, unacknowledge_process_id: str # unacknowledge_timeout_task uses acknowledged_by_confirmed to check if acknowledgement reminder has been confirmed # by the user. Setting to None here to indicate that the user has not confirmed the acknowledgement reminder - alert_group.acknowledged_by_confirmed = None - alert_group.save(update_fields=["acknowledged_by_confirmed"]) + if alert_group.acknowledged_by_confirmed is not None: + alert_group.acknowledged_by_confirmed = None + alert_group.save(update_fields=["acknowledged_by_confirmed"]) if unacknowledge_timeout: # "unack in N minutes if no response" is enabled unacknowledge_timeout_task.apply_async( @@ -77,7 +93,7 @@ def acknowledge_reminder_task(alert_group_pk: int, unacknowledge_process_id: str type=AlertGroupLogRecord.TYPE_ACK_REMINDER_TRIGGERED, author=alert_group.acknowledged_by_user ) task_logger.info(f"created log record {log_record.pk}, sending signal...") - transaction.on_commit(partial(send_alert_group_signal.delay, log_record.pk)) + transaction.on_commit(partial(send_post_ack_reminder_message_signal.delay, log_record.pk)) @shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES) @@ -138,3 +154,13 @@ def unacknowledge_timeout_task(alert_group_pk: int, unacknowledge_process_id: st transaction.on_commit(partial(send_alert_group_signal.delay, log_record.pk)) alert_group.unacknowledge() alert_group.start_escalation_if_needed() + + +@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES) +def send_post_ack_reminder_message_signal(log_record_id): + """ + Sends signal to post acknowledge reminder message to Slack thread. + The signal is connected to AlertGroupSlackRepresentative. + """ + task_logger.info(f"sending signal for posting ack reminder message, log record {log_record_id}") + post_ack_reminder_message_signal.send(sender=send_post_ack_reminder_message_signal, log_record=log_record_id) diff --git a/engine/apps/alerts/tests/test_acknowledge_reminder.py b/engine/apps/alerts/tests/test_acknowledge_reminder.py index 0e6318f490..2c0ba7f516 100644 --- a/engine/apps/alerts/tests/test_acknowledge_reminder.py +++ b/engine/apps/alerts/tests/test_acknowledge_reminder.py @@ -3,12 +3,13 @@ import pytest from celery import uuid as celery_uuid +from django.core.cache import cache from django.utils import timezone from apps.alerts.constants import ActionSource from apps.alerts.models import AlertGroup, AlertGroupLogRecord from apps.alerts.tasks import acknowledge_reminder_task -from apps.alerts.tasks.acknowledge_reminder import unacknowledge_timeout_task +from apps.alerts.tasks.acknowledge_reminder import send_post_ack_reminder_message_signal, unacknowledge_timeout_task from apps.user_management.models import Organization TASK_ID = "TASK_ID" @@ -156,8 +157,10 @@ def test_acknowledge_reminder_task_skip( @patch.object(unacknowledge_timeout_task, "apply_async") @patch.object(acknowledge_reminder_task, "apply_async") +@patch.object(send_post_ack_reminder_message_signal, "delay") @pytest.mark.django_db -def test_acknowledge_reminder_task_reschedules_itself( +def test_acknowledge_reminder_task_reschedules_itself_and_sends_signal( + mock_send_post_ack_reminder_message_signal, mock_acknowledge_reminder_task, mock_unacknowledge_timeout_task, ack_reminder_test_setup, @@ -169,9 +172,6 @@ def test_acknowledge_reminder_task_reschedules_itself( with django_capture_on_commit_callbacks(execute=True) as callbacks: acknowledge_reminder_task(alert_group.pk, TASK_ID) - # send_alert_group_signal task is queued after commit - assert len(callbacks) == 1 - mock_unacknowledge_timeout_task.assert_not_called() mock_acknowledge_reminder_task.assert_called_once_with( (alert_group.pk, TASK_ID), @@ -182,6 +182,10 @@ def test_acknowledge_reminder_task_reschedules_itself( assert log_record.type == AlertGroupLogRecord.TYPE_ACK_REMINDER_TRIGGERED assert log_record.author == alert_group.acknowledged_by_user + # send_post_ack_reminder_message_signal task is queued after commit + assert len(callbacks) == 1 + mock_send_post_ack_reminder_message_signal.assert_called_once_with(log_record.id) + @patch.object(unacknowledge_timeout_task, "apply_async") @patch.object(acknowledge_reminder_task, "apply_async") @@ -369,3 +373,46 @@ def test_ack_reminder_cancel_too_old( mock_acknowledge_reminder_task.assert_not_called() assert not alert_group.log_records.exists() + + +@pytest.mark.django_db +def test_acknowledge_reminder_skip_doubled_notification( + ack_reminder_test_setup, + django_capture_on_commit_callbacks, + caplog, +): + organization, alert_group, user = ack_reminder_test_setup( + unacknowledge_timeout=Organization.UNACKNOWLEDGE_TIMEOUT_NEVER + ) + expected_log_text = f"Acknowledge reminder for alert_group {alert_group.id} has already been sent recently." + + # check task lock cache doesn't exist + lock_id = f"acknowledge-reminder-lock-{alert_group.id}" + assert cache.get(lock_id) is None + + with patch.object(acknowledge_reminder_task, "apply_async") as mock_acknowledge_reminder_task: + with patch.object(send_post_ack_reminder_message_signal, "delay") as mock_send_signal: + with django_capture_on_commit_callbacks(execute=True): + acknowledge_reminder_task(alert_group.pk, TASK_ID) + + log_record = alert_group.log_records.get() + assert log_record.type == AlertGroupLogRecord.TYPE_ACK_REMINDER_TRIGGERED + mock_send_signal.assert_called_once_with(log_record.id) + mock_acknowledge_reminder_task.assert_called_once() + + # check task lock cache exists + assert cache.get(lock_id) == TASK_ID + + assert expected_log_text not in caplog.text + + # acknowledge_reminder_task doesn't proceed the second time if it has been called recently + with patch.object(acknowledge_reminder_task, "apply_async") as mock_acknowledge_reminder_task: + with patch.object(send_post_ack_reminder_message_signal, "delay") as mock_send_signal: + with django_capture_on_commit_callbacks(execute=True): + acknowledge_reminder_task(alert_group.pk, TASK_ID) + + assert alert_group.log_records.count() == 1 + mock_send_signal.assert_not_called() + mock_acknowledge_reminder_task.assert_not_called() + + assert expected_log_text in caplog.text diff --git a/engine/apps/slack/representatives/alert_group_representative.py b/engine/apps/slack/representatives/alert_group_representative.py index 431153e0bf..77d995e6bc 100644 --- a/engine/apps/slack/representatives/alert_group_representative.py +++ b/engine/apps/slack/representatives/alert_group_representative.py @@ -1,4 +1,5 @@ import logging +import typing from celery.utils.log import get_task_logger from django.conf import settings @@ -9,6 +10,9 @@ from apps.slack.scenarios.scenario_step import ScenarioStep from common.custom_celery_tasks import shared_dedicated_queue_retry_task +if typing.TYPE_CHECKING: + from apps.alerts.models import AlertGroupLogRecord + logger = get_task_logger(__name__) logger.setLevel(logging.DEBUG) @@ -130,29 +134,17 @@ def on_create_alert(cls, **kwargs): @classmethod def on_alert_group_action_triggered(cls, **kwargs): logger.debug("Received alert_group_action_triggered signal in SLACK representative") - from apps.alerts.models import AlertGroupLogRecord - - log_record = kwargs["log_record"] - force_sync = kwargs.get("force_sync", False) - if isinstance(log_record, AlertGroupLogRecord): - log_record_id = log_record.pk - else: - log_record_id = log_record - - try: - log_record = AlertGroupLogRecord.objects.get(pk=log_record_id) - except AlertGroupLogRecord.DoesNotExist: - logger.warning( - f"on_alert_group_action_triggered: log record {log_record_id} never created or has been deleted" - ) + log_record = cls.get_log_record_from_kwargs(**kwargs) + if not log_record: return + force_sync = kwargs.get("force_sync", False) if log_record.action_source == ActionSource.SLACK or force_sync: - logger.debug(f"SLACK on_alert_group_action_triggered: sync {log_record_id} {force_sync}") - on_alert_group_action_triggered_async(log_record_id) + logger.debug(f"SLACK on_alert_group_action_triggered: sync {log_record.id} {force_sync}") + on_alert_group_action_triggered_async(log_record.id) else: - logger.debug(f"SLACK on_alert_group_action_triggered: async {log_record_id} {force_sync}") - on_alert_group_action_triggered_async.apply_async((log_record_id,)) + logger.debug(f"SLACK on_alert_group_action_triggered: async {log_record.id} {force_sync}") + on_alert_group_action_triggered_async.apply_async((log_record.id,)) @classmethod def on_alert_group_update_resolution_note(cls, **kwargs): @@ -167,6 +159,26 @@ def on_alert_group_update_resolution_note(cls, **kwargs): step = UpdateResolutionNoteStep(organization.slack_team_identity, organization) step.process_signal(alert_group, resolution_note) + @classmethod + def on_alert_group_post_acknowledge_reminder_message(cls, **kwargs): + log_record = cls.get_log_record_from_kwargs(**kwargs) + if not log_record: + return + slack_team_identity = log_record.alert_group.channel.organization.slack_team_identity + alert_group = log_record.alert_group + logger.debug( + f"Received post_ack_reminder_message_signal signal in SLACK representative for alert_group {alert_group.id}" + ) + if not (log_record.alert_group.slack_message and slack_team_identity): + logger.debug( + f"SLACK representative is NOT applicable for alert_group {alert_group.id}, log record {log_record.id}" + ) + return + + AcknowledgeConfirmationStep = ScenarioStep.get_step("distribute_alerts", "AcknowledgeConfirmationStep") + step = AcknowledgeConfirmationStep(slack_team_identity) + step.process_signal(log_record) + def on_acknowledge(self): AcknowledgeGroupStep = ScenarioStep.get_step("distribute_alerts", "AcknowledgeGroupStep") step = AcknowledgeGroupStep(self.log_record.alert_group.channel.organization.slack_team_identity) @@ -229,6 +241,7 @@ def on_auto_un_acknowledge(self): self.on_un_acknowledge() def on_ack_reminder_triggered(self): + # deprecated, remove this handler after release AcknowledgeConfirmationStep = ScenarioStep.get_step("distribute_alerts", "AcknowledgeConfirmationStep") step = AcknowledgeConfirmationStep(self.log_record.alert_group.channel.organization.slack_team_identity) step.process_signal(self.log_record) @@ -258,3 +271,19 @@ def get_handler_name(self): @classmethod def on_handler_not_found(cls): pass + + @classmethod + def get_log_record_from_kwargs(cls, **kwargs) -> typing.Optional["AlertGroupLogRecord"]: + from apps.alerts.models import AlertGroupLogRecord + + log_record = None + log_record_id = kwargs["log_record"] + + if not isinstance(log_record_id, AlertGroupLogRecord): + try: + log_record = AlertGroupLogRecord.objects.get(pk=log_record_id) + except AlertGroupLogRecord.DoesNotExist: + logger.warning(f"log record {log_record_id} never created or has been deleted") + else: + log_record = log_record_id + return log_record diff --git a/engine/apps/telegram/alert_group_representative.py b/engine/apps/telegram/alert_group_representative.py index ef3ae9927f..9f0ce844af 100644 --- a/engine/apps/telegram/alert_group_representative.py +++ b/engine/apps/telegram/alert_group_representative.py @@ -41,7 +41,6 @@ def get_handlers_map(): AlertGroupLogRecord.TYPE_AUTO_UN_ACK: "alert_group_action", AlertGroupLogRecord.TYPE_RESOLVED: "alert_group_action", AlertGroupLogRecord.TYPE_UN_RESOLVED: "alert_group_action", - AlertGroupLogRecord.TYPE_ACK_REMINDER_TRIGGERED: "alert_group_action", AlertGroupLogRecord.TYPE_SILENCE: "alert_group_action", AlertGroupLogRecord.TYPE_UN_SILENCE: "alert_group_action", AlertGroupLogRecord.TYPE_ATTACHED: "alert_group_action", diff --git a/engine/settings/celery_task_routes.py b/engine/settings/celery_task_routes.py index c6e0ad7882..04a8ffa49a 100644 --- a/engine/settings/celery_task_routes.py +++ b/engine/settings/celery_task_routes.py @@ -93,6 +93,7 @@ # CRITICAL "apps.alerts.tasks.acknowledge_reminder.acknowledge_reminder_task": {"queue": "critical"}, "apps.alerts.tasks.acknowledge_reminder.unacknowledge_timeout_task": {"queue": "critical"}, + "apps.alerts.tasks.acknowledge_reminder.send_post_ack_reminder_message_signal": {"queue": "critical"}, "apps.alerts.tasks.declare_incident.declare_incident": {"queue": "critical"}, "apps.alerts.tasks.distribute_alert.send_alert_create_signal": {"queue": "critical"}, "apps.alerts.tasks.escalate_alert_group.escalate_alert_group": {"queue": "critical"}, From f159a3f72f4edb0c7839a87814d002fd657a82d1 Mon Sep 17 00:00:00 2001 From: Matias Bordese Date: Wed, 16 Oct 2024 10:01:21 -0300 Subject: [PATCH 2/3] Undo schedules using ical cached final data (#5182) Undo one of the changes from https://github.com/grafana/oncall/pull/5172 (since it seems to make the schedules endpoint a bit worse; OTOH, other changes improved latency). Eventually we may consider using the cached on-call users information instead. --- engine/apps/api/serializers/schedule_base.py | 2 +- engine/apps/public_api/serializers/schedules_base.py | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/engine/apps/api/serializers/schedule_base.py b/engine/apps/api/serializers/schedule_base.py index 02de45d8cb..a647e859dc 100644 --- a/engine/apps/api/serializers/schedule_base.py +++ b/engine/apps/api/serializers/schedule_base.py @@ -37,7 +37,7 @@ class Meta: "enable_web_overrides", ] - SELECT_RELATED = ["organization"] + SELECT_RELATED = ["organization", "team", "user_group"] CANT_UPDATE_USER_GROUP_WARNING = ( "Cannot update the user group, make sure to grant user group modification rights to " diff --git a/engine/apps/public_api/serializers/schedules_base.py b/engine/apps/public_api/serializers/schedules_base.py index 2dabd254e0..655380de70 100644 --- a/engine/apps/public_api/serializers/schedules_base.py +++ b/engine/apps/public_api/serializers/schedules_base.py @@ -15,7 +15,7 @@ class ScheduleBaseSerializer(EagerLoadingMixin, serializers.ModelSerializer): slack = serializers.DictField(required=False) team_id = TeamPrimaryKeyRelatedField(required=False, allow_null=True, source="team") - SELECT_RELATED = ["team"] + SELECT_RELATED = ["team", "user_group"] def create(self, validated_data): validated_data = self._correct_validated_data(validated_data) @@ -23,9 +23,7 @@ def create(self, validated_data): return super().create(validated_data) def get_on_call_now(self, obj): - users_on_call = list_users_to_notify_from_ical( - obj, datetime.datetime.now(datetime.timezone.utc), from_cached_final=True - ) + users_on_call = list_users_to_notify_from_ical(obj, datetime.datetime.now(datetime.timezone.utc)) if users_on_call is not None: return [user.public_primary_key for user in users_on_call] else: From 00158a55274609ffe2c91b9f4966da8ac34afe76 Mon Sep 17 00:00:00 2001 From: Matias Bordese Date: Wed, 16 Oct 2024 12:33:06 -0300 Subject: [PATCH 3/3] Handle incident client general request errors (#5184) Handle some (unexpected) incident request errors more gracefully. --- engine/common/incident_api/client.py | 23 +++++++++++++--- .../common/incident_api/tests/test_client.py | 27 +++++++++++++++++++ 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/engine/common/incident_api/client.py b/engine/common/incident_api/client.py index 7013b45a3c..ac5156fce7 100644 --- a/engine/common/incident_api/client.py +++ b/engine/common/incident_api/client.py @@ -4,6 +4,7 @@ import requests from django.conf import settings +from requests.exceptions import RequestException from common.constants.plugin_ids import PluginID @@ -91,6 +92,18 @@ def __init__(self, api_url: str, api_token: str) -> None: def _request_headers(self): return {"User-Agent": settings.GRAFANA_COM_USER_AGENT, "Authorization": f"Bearer {self.api_token}"} + def _make_request(self, url, *args, **kwargs): + try: + response = requests.post(url, *args, **kwargs) + except RequestException as e: + raise IncidentAPIException( + status=e.response.status_code if e.response else 500, + url=e.response.request.url if e.response else url, + msg=e.response.text if e.response else "Unexpected error", + method=e.response.request.method if e.response else "POST", + ) + return response + def _check_response(self, response: requests.models.Response): message = "" @@ -119,7 +132,7 @@ def create_incident( endpoint = "api/v1/IncidentsService.CreateIncident" url = self.api_url + endpoint # NOTE: invalid severity will raise a 500 error - response = requests.post( + response = self._make_request( url, json={ "title": title, @@ -137,7 +150,9 @@ def create_incident( def get_incident(self, incident_id: str) -> typing.Tuple[IncidentDetails, requests.models.Response]: endpoint = "api/v1/IncidentsService.GetIncident" url = self.api_url + endpoint - response = requests.post(url, json={"incidentID": incident_id}, timeout=TIMEOUT, headers=self._request_headers) + response = self._make_request( + url, json={"incidentID": incident_id}, timeout=TIMEOUT, headers=self._request_headers + ) self._check_response(response) return response.json().get("incident"), response @@ -146,7 +161,7 @@ def get_severities(self) -> typing.Tuple[typing.List[SeverityDetails], requests. endpoint = "api/SeveritiesService.GetOrgSeverities" url = self.api_url + endpoint # pass empty json payload otherwise it will return a 500 response - response = requests.post(url, timeout=TIMEOUT, headers=self._request_headers, json={}) + response = self._make_request(url, timeout=TIMEOUT, headers=self._request_headers, json={}) self._check_response(response) return response.json().get("severities"), response @@ -155,7 +170,7 @@ def add_activity( ) -> typing.Tuple[ActivityItemDetails, requests.models.Response]: endpoint = "api/v1/ActivityService.AddActivity" url = self.api_url + endpoint - response = requests.post( + response = self._make_request( url, json={"incidentID": incident_id, "activityKind": kind, "body": body}, timeout=TIMEOUT, diff --git a/engine/common/incident_api/tests/test_client.py b/engine/common/incident_api/tests/test_client.py index 2ba6840d5b..ec82262d88 100644 --- a/engine/common/incident_api/tests/test_client.py +++ b/engine/common/incident_api/tests/test_client.py @@ -1,7 +1,9 @@ import json +from unittest.mock import patch import httpretty import pytest +from requests.exceptions import RequestException from rest_framework import status from common.incident_api.client import ( @@ -185,3 +187,28 @@ def test_error_handling(endpoint, client_method_name, args): assert excinfo.value.msg == response_data["error"] assert excinfo.value.url == url assert excinfo.value.method == "POST" + + +@pytest.mark.parametrize( + "endpoint, client_method_name, args", + [ + ("api/v1/IncidentsService.CreateIncident", "create_incident", ("title",)), + ("api/v1/IncidentsService.GetIncident", "get_incident", ("incident-id",)), + ("api/SeveritiesService.GetOrgSeverities", "get_severities", ()), + ("api/v1/ActivityService.AddActivity", "add_activity", ("incident-id", "content")), + ], +) +@httpretty.activate(verbose=True, allow_net_connect=False) +def test_unexpected_error_handling(endpoint, client_method_name, args): + stack_url = "https://foobar.grafana.net" + api_token = "asdfasdfasdfasdf" + client = IncidentAPIClient(stack_url, api_token) + url = f"{stack_url}{client.INCIDENT_BASE_PATH}{endpoint}" + with patch("common.incident_api.client.requests.post", side_effect=RequestException): + with pytest.raises(IncidentAPIException) as excinfo: + client_method = getattr(client, client_method_name) + client_method(*args) + assert excinfo.value.status == 500 + assert excinfo.value.msg == "Unexpected error" + assert excinfo.value.url == url + assert excinfo.value.method == "POST"