From f22c1854cb460289e43b9ca7dfe1b936819ff3bc Mon Sep 17 00:00:00 2001 From: IDoneShaveIt Date: Mon, 22 Jan 2024 15:36:21 +0200 Subject: [PATCH 1/3] Unified alerts' API and fetcher specific type methods + split DataMonitoringAlerts flow into steps (populate, fetch, filter, format, skip, send) --- .../monitor/api/alerts/alert_filters.py | 145 ++--- elementary/monitor/api/alerts/alerts.py | 130 +---- elementary/monitor/api/alerts/schema.py | 64 +-- .../alerts/data_monitoring_alerts.py | 79 ++- elementary/monitor/fetchers/alerts/alerts.py | 63 +-- .../fetchers/alerts/schema/alert_data.py | 325 +++++++++++ .../fetchers/alerts/schema/pending_alerts.py | 338 +----------- tests/mocks/fetchers/alerts_fetcher_mock.py | 91 ++-- .../monitor/api/alerts/test_alert_filters.py | 503 +++++++++++------- .../monitor/api/alerts/test_alerts_api.py | 37 +- .../alerts/test_data_monitoring_alerts.py | 8 +- .../fetchers/alerts/schemas/__init__.py | 0 .../test_alert_data_schema.py} | 30 +- .../fetchers/alerts/test_alerts_fetcher.py | 6 +- 14 files changed, 858 insertions(+), 961 deletions(-) create mode 100644 elementary/monitor/fetchers/alerts/schema/alert_data.py create mode 100644 tests/unit/monitor/fetchers/alerts/schemas/__init__.py rename tests/unit/monitor/fetchers/alerts/{test_alerts_schema.py => schemas/test_alert_data_schema.py} (75%) diff --git a/elementary/monitor/api/alerts/alert_filters.py b/elementary/monitor/api/alerts/alert_filters.py index 8b21eaa6d..b3a271049 100644 --- a/elementary/monitor/api/alerts/alert_filters.py +++ b/elementary/monitor/api/alerts/alert_filters.py @@ -1,6 +1,6 @@ import json from functools import reduce -from typing import List, Union +from typing import List from elementary.monitor.data_monitoring.schema import ( FilterSchema, @@ -9,9 +9,8 @@ StatusFilterSchema, ) from elementary.monitor.fetchers.alerts.schema.pending_alerts import ( - PendingModelAlertSchema, - PendingSourceFreshnessAlertSchema, - PendingTestAlertSchema, + AlertTypes, + PendingAlertSchema, ) from elementary.utils.log import get_logger @@ -19,17 +18,9 @@ def filter_alerts( - alerts: Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], - ], + alerts: List[PendingAlertSchema], alerts_filter: FiltersSchema = FiltersSchema(), -) -> Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], -]: +) -> List[PendingAlertSchema]: # If the filter is on invocation stuff, it's not relevant to alerts and we return an empty list if ( alerts_filter.invocation_id is not None @@ -59,21 +50,9 @@ def filter_alerts( def _find_common_alerts( - first_alerts: Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], - ], - second_alerts: Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], - ], -) -> Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], -]: + first_alerts: List[PendingAlertSchema], + second_alerts: List[PendingAlertSchema], +) -> List[PendingAlertSchema]: first_hashable_alerts = [alert.json(sort_keys=True) for alert in first_alerts] second_hashable_alerts = [alert.json(sort_keys=True) for alert in second_alerts] common_hashable_alerts = [ @@ -94,17 +73,9 @@ def _find_common_alerts( def _filter_alerts_by_tags( - alerts: Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], - ], + alerts: List[PendingAlertSchema], tags_filters: List[FilterSchema], -) -> Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], -]: +) -> List[PendingAlertSchema]: if not tags_filters: return [*alerts] @@ -114,26 +85,18 @@ def _filter_alerts_by_tags( for tags_filter in tags_filters: filtered_alerts_by_tags = [] for alert in alerts: - if any(tag in (alert.tags or []) for tag in tags_filter.values): + if any(tag in (alert.data.tags or []) for tag in tags_filter.values): filtered_alerts_by_tags.append(alert) grouped_filtered_alerts_by_tags.append(filtered_alerts_by_tags) # AND filter between all tags_filters - return reduce(_find_common_alerts, grouped_filtered_alerts_by_tags) # type: ignore[return-value, arg-type] + return reduce(_find_common_alerts, grouped_filtered_alerts_by_tags) def _filter_alerts_by_owners( - alerts: Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], - ], + alerts: List[PendingAlertSchema], owners_filters: List[FilterSchema], -) -> Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], -]: +) -> List[PendingAlertSchema]: if not owners_filters: return [*alerts] @@ -143,26 +106,20 @@ def _filter_alerts_by_owners( for owners_filter in owners_filters: filtered_alerts_by_owners = [] for alert in alerts: - if any(owner in alert.unified_owners for owner in owners_filter.values): + if any( + owner in alert.data.unified_owners for owner in owners_filter.values + ): filtered_alerts_by_owners.append(alert) grouped_filtered_alerts_by_owners.append(filtered_alerts_by_owners) # AND filter between all owners_filters - return reduce(_find_common_alerts, grouped_filtered_alerts_by_owners) # type: ignore[return-value, arg-type] + return reduce(_find_common_alerts, grouped_filtered_alerts_by_owners) def _filter_alerts_by_models( - alerts: Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], - ], + alerts: List[PendingAlertSchema], models_filters: List[FilterSchema], -) -> Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], -]: +) -> List[PendingAlertSchema]: if not models_filters: return [*alerts] @@ -173,40 +130,36 @@ def _filter_alerts_by_models( filtered_alerts_by_models = [] for alert in alerts: if any( - (alert.model_unique_id and alert.model_unique_id.endswith(model)) + ( + alert.data.model_unique_id + and alert.data.model_unique_id.endswith(model) + ) for model in models_filter.values ): filtered_alerts_by_models.append(alert) grouped_filtered_alerts_by_models.append(filtered_alerts_by_models) # AND filter between all models_filters - return reduce(_find_common_alerts, grouped_filtered_alerts_by_models) # type: ignore[return-value, arg-type] + return reduce(_find_common_alerts, grouped_filtered_alerts_by_models) def _filter_alerts_by_node_names( - alerts: Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], - ], + alerts: List[PendingAlertSchema], node_names_filters: List[str], -) -> Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], -]: +) -> List[PendingAlertSchema]: if not node_names_filters: return [*alerts] filtered_alerts = [] for alert in alerts: alert_node_name = None - if isinstance(alert, PendingTestAlertSchema): - alert_node_name = alert.test_name - elif isinstance(alert, PendingModelAlertSchema) or isinstance( - alert, PendingSourceFreshnessAlertSchema + alert_type = AlertTypes(alert.type) + if alert_type is AlertTypes.TEST: + alert_node_name = alert.data.test_name # type: ignore[union-attr] + elif ( + alert_type is AlertTypes.MODEL or alert_type is AlertTypes.SOURCE_FRESHNESS ): - alert_node_name = alert.model_unique_id + alert_node_name = alert.data.model_unique_id else: # Shouldn't happen raise Exception(f"Unexpected alert type: {type(alert)}") @@ -222,17 +175,9 @@ def _filter_alerts_by_node_names( def _filter_alerts_by_statuses( - alerts: Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], - ], + alerts: List[PendingAlertSchema], statuses_filters: List[StatusFilterSchema], -) -> Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], -]: +) -> List[PendingAlertSchema]: if not statuses_filters: return [*alerts] @@ -242,26 +187,18 @@ def _filter_alerts_by_statuses( for statuses_filter in statuses_filters: filtered_alerts_by_statuses = [] for alert in alerts: - if any(status == alert.status for status in statuses_filter.values): + if any(status == alert.data.status for status in statuses_filter.values): filtered_alerts_by_statuses.append(alert) grouped_filtered_alerts_by_statuses.append(filtered_alerts_by_statuses) # AND filter between all statuses_filters - return reduce(_find_common_alerts, grouped_filtered_alerts_by_statuses) # type: ignore[return-value, arg-type] + return reduce(_find_common_alerts, grouped_filtered_alerts_by_statuses) def _filter_alerts_by_resource_types( - alerts: Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], - ], + alerts: List[PendingAlertSchema], resource_types_filters: List[ResourceTypeFilterSchema], -) -> Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], -]: +) -> List[PendingAlertSchema]: if not resource_types_filters: return [*alerts] @@ -272,7 +209,7 @@ def _filter_alerts_by_resource_types( filtered_alerts_by_resource_types = [] for alert in alerts: if any( - resource_type == alert.resource_type.value + resource_type == alert.data.resource_type.value for resource_type in resource_types_filter.values ): filtered_alerts_by_resource_types.append(alert) @@ -281,4 +218,4 @@ def _filter_alerts_by_resource_types( ) # AND filter between all resource_types_filters - return reduce(_find_common_alerts, grouped_filtered_alerts_by_resource_types) # type: ignore[return-value, arg-type] + return reduce(_find_common_alerts, grouped_filtered_alerts_by_resource_types) diff --git a/elementary/monitor/api/alerts/alerts.py b/elementary/monitor/api/alerts/alerts.py index 26c0803a4..4e14f2653 100644 --- a/elementary/monitor/api/alerts/alerts.py +++ b/elementary/monitor/api/alerts/alerts.py @@ -1,25 +1,13 @@ from collections import defaultdict from datetime import datetime -from typing import DefaultDict, Dict, List, Optional, Union +from typing import DefaultDict, Dict, List, Optional from elementary.clients.api.api_client import APIClient from elementary.clients.dbt.dbt_runner import DbtRunner from elementary.config.config import Config -from elementary.monitor.api.alerts.alert_filters import filter_alerts -from elementary.monitor.api.alerts.schema import ( - AlertsSchema, - ModelAlertsSchema, - SortedAlertsSchema, - SourceFreshnessAlertsSchema, - TestAlertsSchema, -) -from elementary.monitor.data_monitoring.schema import FiltersSchema +from elementary.monitor.api.alerts.schema import SortedAlertsSchema from elementary.monitor.fetchers.alerts.alerts import AlertsFetcher -from elementary.monitor.fetchers.alerts.schema.pending_alerts import ( - PendingModelAlertSchema, - PendingSourceFreshnessAlertSchema, - PendingTestAlertSchema, -) +from elementary.monitor.fetchers.alerts.schema.pending_alerts import PendingAlertSchema from elementary.utils.log import get_logger logger = get_logger(__name__) @@ -45,83 +33,17 @@ def __init__( self.global_suppression_interval = global_suppression_interval self.override_meta_suppression_interval = override_meta_suppression_interval - def get_new_alerts( - self, - days_back: int, - filter: FiltersSchema = FiltersSchema(), - ) -> AlertsSchema: - new_test_alerts = self.get_test_alerts(days_back, filter) - new_model_alerts = self.get_model_alerts(days_back, filter) - new_source_freshness_alerts = self.get_source_freshness_alerts( - days_back, filter - ) - return AlertsSchema( - tests=new_test_alerts, - models=new_model_alerts, - source_freshnesses=new_source_freshness_alerts, - ) - - def get_test_alerts( - self, - days_back: int, - filter: FiltersSchema = FiltersSchema(), - ) -> TestAlertsSchema: - pending_test_alerts = self.alerts_fetcher.query_pending_test_alerts(days_back) - filtered_pending_test_alerts = filter_alerts(pending_test_alerts, filter) - last_alert_sent_times = self.alerts_fetcher.query_last_test_alert_times( - days_back - ) - test_alerts = self._sort_alerts( - filtered_pending_test_alerts, last_alert_sent_times - ) - # Mypy issues with Union types. We need to ignore the are-type - return TestAlertsSchema(send=test_alerts.send, skip=test_alerts.skip) # type: ignore[arg-type] - - def get_model_alerts( - self, - days_back: int, - filter: FiltersSchema = FiltersSchema(), - ) -> ModelAlertsSchema: - pending_model_alerts = self.alerts_fetcher.query_pending_model_alerts(days_back) - filtered_pending_model_alerts = filter_alerts(pending_model_alerts, filter) - last_alert_sent_times = self.alerts_fetcher.query_last_model_alert_times( - days_back - ) - model_alerts = self._sort_alerts( - filtered_pending_model_alerts, last_alert_sent_times - ) - # Mypy issues with Union types. We need to ignore the are-type - return ModelAlertsSchema(send=model_alerts.send, skip=model_alerts.skip) # type: ignore[arg-type] - - def get_source_freshness_alerts( - self, - days_back: int, - filter: FiltersSchema = FiltersSchema(), - ) -> SourceFreshnessAlertsSchema: - pending_source_freshness_alerts = ( - self.alerts_fetcher.query_pending_source_freshness_alerts(days_back) - ) - filtered_pending_source_freshness_alert = filter_alerts( - pending_source_freshness_alerts, filter - ) - last_alert_sent_times = ( - self.alerts_fetcher.query_last_source_freshness_alert_times(days_back) - ) - source_freshness_alerts = self._sort_alerts( - filtered_pending_source_freshness_alert, last_alert_sent_times - ) - # Mypy issues with Union types. We need to ignore the are-type - return SourceFreshnessAlertsSchema( - send=source_freshness_alerts.send, skip=source_freshness_alerts.skip # type: ignore[arg-type] + def get_new_alerts(self, days_back: int) -> SortedAlertsSchema: + pending_alerts = self.alerts_fetcher.query_pending_alerts(days_back=days_back) + last_alert_sent_times = self.alerts_fetcher.query_last_alert_times( + days_back=days_back ) + sorted_alerts = self._sort_alerts(pending_alerts, last_alert_sent_times) + return sorted_alerts def skip_alerts( self, - alerts_to_skip: Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], - ], + alerts_to_skip: List[PendingAlertSchema], ) -> None: self.alerts_fetcher.skip_alerts(alerts_to_skip=alerts_to_skip) @@ -130,11 +52,7 @@ def update_sent_alerts(self, alert_ids: List[str]) -> None: def _sort_alerts( self, - pending_alerts: Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], - ], + pending_alerts: List[PendingAlertSchema], last_alert_sent_times: Dict[str, str], ) -> SortedAlertsSchema: suppressed_alerts = self._get_suppressed_alerts( @@ -152,17 +70,11 @@ def _sort_alerts( alerts_to_skip.append(valid_alert) else: alerts_to_send.append(valid_alert) - - # Mypy issues with Union types. We need to ignore the are-type - return SortedAlertsSchema(send=alerts_to_send, skip=alerts_to_skip) # type: ignore[arg-type] + return SortedAlertsSchema(send=alerts_to_send, skip=alerts_to_skip) def _get_suppressed_alerts( self, - alerts: Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], - ], + alerts: List[PendingAlertSchema], last_alert_sent_times: Dict[str, str], ) -> List[str]: suppressed_alerts = [] @@ -174,7 +86,7 @@ def _get_suppressed_alerts( logger.debug("Alert without an id detected!") continue - suppression_interval = alert.get_suppression_interval( + suppression_interval = alert.data.get_suppression_interval( self.global_suppression_interval, self.override_meta_suppression_interval, ) @@ -196,21 +108,11 @@ def _get_suppressed_alerts( @staticmethod def _get_latest_alerts( - alerts: Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], - ], + alerts: List[PendingAlertSchema], ) -> List[str]: alert_last_times: DefaultDict[ str, - Optional[ - Union[ - PendingModelAlertSchema, - PendingSourceFreshnessAlertSchema, - PendingTestAlertSchema, - ] - ], + Optional[PendingAlertSchema], ] = defaultdict(lambda: None) latest_alert_ids = [] for alert in alerts: diff --git a/elementary/monitor/api/alerts/schema.py b/elementary/monitor/api/alerts/schema.py index e68cf3cc8..93b048cbf 100644 --- a/elementary/monitor/api/alerts/schema.py +++ b/elementary/monitor/api/alerts/schema.py @@ -1,66 +1,10 @@ -from typing import List, Union +from typing import List from pydantic import BaseModel -from elementary.monitor.fetchers.alerts.schema.pending_alerts import ( - PendingModelAlertSchema, - PendingSourceFreshnessAlertSchema, - PendingTestAlertSchema, -) - - -class TestAlertsSchema(BaseModel): - send: List[PendingTestAlertSchema] - skip: List[PendingTestAlertSchema] - - -class ModelAlertsSchema(BaseModel): - send: List[PendingModelAlertSchema] - skip: List[PendingModelAlertSchema] - - -class SourceFreshnessAlertsSchema(BaseModel): - send: List[PendingSourceFreshnessAlertSchema] - skip: List[PendingSourceFreshnessAlertSchema] +from elementary.monitor.fetchers.alerts.schema.pending_alerts import PendingAlertSchema class SortedAlertsSchema(BaseModel): - send: Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], - ] - skip: Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], - ] - - class Config: - smart_union = True - - -class AlertsSchema(BaseModel): - tests: TestAlertsSchema - models: ModelAlertsSchema - source_freshnesses: SourceFreshnessAlertsSchema - - @property - def all_alerts( - self, - ) -> List[ - Union[ - PendingTestAlertSchema, - PendingModelAlertSchema, - PendingSourceFreshnessAlertSchema, - ] - ]: - return [*self.tests.send, *self.models.send, *self.source_freshnesses.send] - - @property - def count(self) -> int: - return ( - len(self.tests.send) - + len(self.models.send) - + len(self.source_freshnesses.send) - ) + send: List[PendingAlertSchema] + skip: List[PendingAlertSchema] diff --git a/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py b/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py index 9559f27ad..04a358d54 100644 --- a/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py +++ b/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py @@ -10,8 +10,9 @@ from elementary.monitor.alerts.model_alert import ModelAlertModel from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel from elementary.monitor.alerts.test_alert import TestAlertModel +from elementary.monitor.api.alerts.alert_filters import filter_alerts from elementary.monitor.api.alerts.alerts import AlertsAPI -from elementary.monitor.api.alerts.schema import AlertsSchema +from elementary.monitor.api.alerts.schema import SortedAlertsSchema from elementary.monitor.data_monitoring.alerts.integrations.base_integration import ( BaseIntegration, ) @@ -20,6 +21,7 @@ ) from elementary.monitor.data_monitoring.data_monitoring import DataMonitoring from elementary.monitor.data_monitoring.schema import FiltersSchema +from elementary.monitor.fetchers.alerts.schema.pending_alerts import PendingAlertSchema from elementary.tracking.tracking_interface import Tracking from elementary.utils.log import get_logger @@ -63,15 +65,37 @@ def _get_integration_client(self) -> BaseIntegration: override_config_defaults=self.override_config_defaults, ) - def _fetch_data(self, days_back: int) -> AlertsSchema: + def _populate_data( + self, + dbt_full_refresh: bool = False, + dbt_vars: Optional[dict] = None, + ) -> bool: + logger.info("Running internal dbt run to populate alerts") + success = self.internal_dbt_runner.run( + models="elementary_cli.alerts.alerts_v2", + full_refresh=dbt_full_refresh, + vars=dbt_vars, + ) + self.execution_properties["alerts_populate_success"] = success + if not success: + logger.info("Could not populate alerts successfully") + + return success + + def _fetch_data(self, days_back: int) -> SortedAlertsSchema: return self.alerts_api.get_new_alerts( days_back=days_back, - filter=self.selector_filter, + ) + + def _filter_data(self, data: SortedAlertsSchema) -> SortedAlertsSchema: + return SortedAlertsSchema( + send=filter_alerts(alerts=data.send, alerts_filter=self.selector_filter), + skip=filter_alerts(alerts=data.skip, alerts_filter=self.selector_filter), ) def _format_alerts( self, - alerts: AlertsSchema, + alerts: List[PendingAlertSchema], ) -> List[ Union[ TestAlertModel, @@ -87,9 +111,11 @@ def _format_alerts( default_alerts_group_by_strategy = GroupingType( self.config.slack_group_alerts_by ) - for alert in alerts.all_alerts: - group_alerts_by = alert.group_alerts_by or default_alerts_group_by_strategy - formatted_alert = alert.format_alert( + for alert in alerts: + group_alerts_by = ( + alert.data.group_alerts_by or default_alerts_group_by_strategy + ) + formatted_alert = alert.data.format_alert( timezone=self.config.timezone, report_url=self.config.report_url, elementary_database_and_schema=self.elementary_database_and_schema, @@ -174,10 +200,8 @@ def _send_alerts( # Now update sent alerts counter: self.execution_properties["sent_alert_count"] = self.sent_alert_count - def _skip_alerts(self, alerts: AlertsSchema): - self.alerts_api.skip_alerts(alerts.tests.skip) - self.alerts_api.skip_alerts(alerts.models.skip) - self.alerts_api.skip_alerts(alerts.source_freshnesses.skip) + def _skip_alerts(self, alerts: List[PendingAlertSchema]): + self.alerts_api.skip_alerts(alerts) def run_alerts( self, @@ -185,26 +209,33 @@ def run_alerts( dbt_full_refresh: bool = False, dbt_vars: Optional[dict] = None, ) -> bool: - logger.info("Running internal dbt run to aggregate alerts") - success = self.internal_dbt_runner.run( - models="elementary_cli.alerts.alerts_v2", - full_refresh=dbt_full_refresh, - vars=dbt_vars, + # Populate data + popopulated_data_successfully = self._populate_data( + dbt_full_refresh=dbt_full_refresh, dbt_vars=dbt_vars ) - self.execution_properties["alerts_run_success"] = success - if not success: - logger.info("Could not aggregate alerts successfully") + if not popopulated_data_successfully: self.success = False self.execution_properties["success"] = self.success return self.success + # Fetch and filter data alerts = self._fetch_data(days_back) - self._skip_alerts(alerts) - formatted_alerts = self._format_alerts(alerts=alerts) + alerts = self._filter_data(alerts) + alerts_to_skip = alerts.skip + alerts_to_send = alerts.send + + # Skip alerts + self._skip_alerts(alerts_to_skip) + + # Format alerts + formatted_alerts = self._format_alerts(alerts=alerts_to_send) + + # Send alerts self._send_alerts(formatted_alerts) - if self.send_test_message_on_success and alerts.count == 0: + + if self.send_test_message_on_success and len(alerts_to_send) == 0: self._send_test_message() - self.execution_properties["alert_count"] = alerts.count + self.execution_properties["alert_count"] = len(alerts_to_send) self.execution_properties["elementary_test_count"] = len( [ alert @@ -213,7 +244,7 @@ def run_alerts( ] ) self.execution_properties["has_subscribers"] = any( - alert.subscribers for alert in alerts.all_alerts + alert.data.subscribers for alert in alerts_to_send ) self.execution_properties["run_end"] = True self.execution_properties["success"] = self.success diff --git a/elementary/monitor/fetchers/alerts/alerts.py b/elementary/monitor/fetchers/alerts/alerts.py index 593b8ba33..d2f980818 100644 --- a/elementary/monitor/fetchers/alerts/alerts.py +++ b/elementary/monitor/fetchers/alerts/alerts.py @@ -1,5 +1,5 @@ import json -from typing import Dict, List, Optional, Union +from typing import Dict, List, Optional from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner from elementary.clients.fetcher.fetcher import FetcherClient @@ -7,9 +7,6 @@ from elementary.monitor.fetchers.alerts.schema.pending_alerts import ( AlertTypes, PendingAlertSchema, - PendingModelAlertSchema, - PendingSourceFreshnessAlertSchema, - PendingTestAlertSchema, ) from elementary.utils.log import get_logger from elementary.utils.time import get_now_utc_str @@ -30,11 +27,7 @@ def __init__( def skip_alerts( self, - alerts_to_skip: Union[ - List[PendingTestAlertSchema], - List[PendingModelAlertSchema], - List[PendingSourceFreshnessAlertSchema], - ], + alerts_to_skip: List[PendingAlertSchema], ): alert_ids = [alert.id for alert in alerts_to_skip] alert_ids_chunks = self._split_list_to_chunks(alert_ids) @@ -46,7 +39,7 @@ def skip_alerts( quiet=True, ) - def _query_pending_alerts( + def query_pending_alerts( self, days_back: int, type: Optional[AlertTypes] = None ) -> List[PendingAlertSchema]: pending_alerts_results = self.dbt_runner.run_operation( @@ -58,7 +51,7 @@ def _query_pending_alerts( for result in json.loads(pending_alerts_results[0]) ] - def _query_last_alert_times( + def query_last_alert_times( self, days_back: int, type: Optional[AlertTypes] = None ) -> Dict[str, str]: response = self.dbt_runner.run_operation( @@ -67,54 +60,6 @@ def _query_last_alert_times( ) return json.loads(response[0]) - def query_pending_test_alerts(self, days_back: int) -> List[PendingTestAlertSchema]: - logger.info("Querying test alerts.") - pending_test_alerts_results = self._query_pending_alerts( - days_back=days_back, type=AlertTypes.TEST - ) - return [ - PendingTestAlertSchema(**result.data) - for result in pending_test_alerts_results - ] - - def query_pending_model_alerts( - self, days_back: int - ) -> List[PendingModelAlertSchema]: - logger.info("Querying model alerts.") - pending_model_alerts_results = self._query_pending_alerts( - days_back=days_back, type=AlertTypes.MODEL - ) - return [ - PendingModelAlertSchema(**result.data) - for result in pending_model_alerts_results - ] - - def query_pending_source_freshness_alerts( - self, days_back: int - ) -> List[PendingSourceFreshnessAlertSchema]: - logger.info("Querying source freshness alerts.") - pending_source_freshness_alerts_results = self._query_pending_alerts( - days_back=days_back, type=AlertTypes.SOURCE_FRESHNESS - ) - return [ - PendingSourceFreshnessAlertSchema(**result.data) - for result in pending_source_freshness_alerts_results - ] - - def query_last_test_alert_times(self, days_back: int) -> Dict[str, str]: - logger.info("Querying test alerts last sent times.") - return self._query_last_alert_times(days_back=days_back, type=AlertTypes.TEST) - - def query_last_model_alert_times(self, days_back: int) -> Dict[str, str]: - logger.info("Querying model alerts last sent times.") - return self._query_last_alert_times(days_back=days_back, type=AlertTypes.MODEL) - - def query_last_source_freshness_alert_times(self, days_back: int) -> Dict[str, str]: - logger.info("Querying source freshness alerts last sent times.") - return self._query_last_alert_times( - days_back=days_back, type=AlertTypes.SOURCE_FRESHNESS - ) - def update_sent_alerts(self, alert_ids: List[str]) -> None: alert_ids_chunks = self._split_list_to_chunks(alert_ids) logger.info("Update sent alerts") diff --git a/elementary/monitor/fetchers/alerts/schema/alert_data.py b/elementary/monitor/fetchers/alerts/schema/alert_data.py new file mode 100644 index 000000000..40312d3c9 --- /dev/null +++ b/elementary/monitor/fetchers/alerts/schema/alert_data.py @@ -0,0 +1,325 @@ +from datetime import datetime +from typing import Dict, List, Optional, Union + +from pydantic import BaseModel, Field, validator + +from elementary.monitor.alerts.model_alert import ModelAlertModel +from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel +from elementary.monitor.alerts.test_alert import TestAlertModel +from elementary.monitor.data_monitoring.schema import ResourceType +from elementary.utils.dicts import flatten_dict_by_key, merge_dicts_attribute +from elementary.utils.json_utils import ( + try_load_json, + unpack_and_flatten_and_dedup_list_of_strings, + unpack_and_flatten_str_to_list, +) + +ALERTS_CONFIG_KEY = "alerts_config" +CHANNEL_KEY = "channel" +DESCRIPTION_KEY = "description" +OWNER_KEY = "owner" +SUBSCRIBERS_KEY = "subscribers" +ALERT_FIELDS_KEY = "alert_fields" +ALERT_SUPPRESSION_INTERVAL_KEY = "alert_suppression_interval" +GROUP_ALERTS_BY_KEY = "slack_group_alerts_by" + + +class BaseAlertDataSchema(BaseModel): + id: str + alert_class_id: str + model_unique_id: Optional[str] = None + detected_at: datetime + database_name: Optional[str] = None + schema_name: str + tags: Optional[List[str]] = None + owners: Optional[List[str]] = None + model_meta: Optional[Dict] = None + status: str + + @property + def unified_meta(self) -> Dict: + return self.flatten_model_meta + + @property + def flatten_model_meta(self) -> Dict: + return self._flatten_meta(self.model_meta) + + @property + def alert_suppression_interval(self) -> Optional[int]: + return self.unified_meta.get(ALERT_SUPPRESSION_INTERVAL_KEY) + + @property + def group_alerts_by(self) -> Optional[str]: + return self.unified_meta.get(GROUP_ALERTS_BY_KEY) + + @property + def unified_owners(self) -> List[str]: + # Make sure we return both meta defined owners and config defined owners. + config_owners = self.owners or [] + meta_owners = self._get_alert_meta_attrs(OWNER_KEY) + return list(set(config_owners + meta_owners)) + + @property + def subscribers(self) -> List[str]: + return self._get_alert_meta_attrs(SUBSCRIBERS_KEY) + + @property + def description(self) -> Optional[str]: + return self.unified_meta.get(DESCRIPTION_KEY) + + @property + def alert_fields(self) -> List[str]: + return self.unified_meta.get(ALERT_FIELDS_KEY, []) + + @validator("model_meta", pre=True, always=True) + def validate_model_meta(cls, model_meta: Optional[Dict]) -> Dict: + return cls._validate_dict(model_meta) + + @validator("tags", pre=True, always=True) + def validate_tags(cls, tags: Optional[Union[List[str], str]]): + return unpack_and_flatten_and_dedup_list_of_strings(tags) + + @validator("owners", pre=True, always=True) + def validate_owners(cls, owners: Optional[Union[List[str], str]]): + return unpack_and_flatten_and_dedup_list_of_strings(owners) + + @staticmethod + def _flatten_meta(meta: Optional[Dict] = None) -> Dict: + return flatten_dict_by_key(meta, ALERTS_CONFIG_KEY) if meta else dict() + + def _get_alert_meta_attrs(self, meta_key: str) -> List[str]: + attrs: List[str] = merge_dicts_attribute( + dicts=[self.flatten_model_meta], attribute_key=meta_key + ) + return unpack_and_flatten_and_dedup_list_of_strings(attrs) + + @staticmethod + def _validate_dict(value: Optional[Dict]) -> Dict: + if not value: + return dict() + return try_load_json(value) + + def format_alert( + self, + timezone: Optional[str] = None, + report_url: Optional[str] = None, + elementary_database_and_schema: Optional[str] = None, + global_suppression_interval: int = 0, + override_config: bool = False, + ): + raise NotImplementedError + + def get_suppression_interval( + self, + interval_from_cli: int, + override_by_cli: bool = False, + ) -> int: + interval_from_alert = self.alert_suppression_interval + if interval_from_alert is None or override_by_cli: + return interval_from_cli + return interval_from_alert + + +class TestAlertDataSchema(BaseAlertDataSchema): + test_unique_id: str + table_name: Optional[str] = None + column_name: Optional[str] = None + test_type: str + test_sub_type: str + test_results_description: str + test_results_query: Optional[str] = None + test_rows_sample: Optional[List[Dict]] = None + other: Optional[Dict] = None + test_name: str + test_short_name: str + test_params: Optional[Dict] = None + severity: str + test_meta: Optional[Dict] = None + elementary_unique_id: str + resource_type: ResourceType = Field(ResourceType.TEST, const=True) + + @property + def flatten_model_meta(self) -> Dict: + return self._flatten_meta(self.model_meta) + + @property + def flatten_test_meta(self) -> Dict: + return self._flatten_meta(self.test_meta) + + @property + def unified_meta(self) -> Dict: + return {**self.flatten_model_meta, **self.flatten_test_meta} + + @validator("test_rows_sample", pre=True, always=True) + def validate_test_rows_sample(cls, test_rows_sample): + if not test_rows_sample: + return [] + return unpack_and_flatten_str_to_list(test_rows_sample) + + @validator("test_params", pre=True, always=True) + def validate_test_params(cls, test_params: Optional[Dict]) -> Dict: + return cls._validate_dict(test_params) + + @validator("test_meta", pre=True, always=True) + def validate_test_meta(cls, test_meta: Optional[Dict]) -> Dict: + return cls._validate_dict(test_meta) + + @validator("other", pre=True, always=True) + def validate_other(cls, other: Optional[Dict]) -> Dict: + return cls._validate_dict(other) + + def _get_alert_meta_attrs(self, meta_key: str) -> List[str]: + attrs: List[str] = merge_dicts_attribute( + dicts=[self.flatten_model_meta, self.flatten_test_meta], + attribute_key=meta_key, + ) + return unpack_and_flatten_and_dedup_list_of_strings(attrs) + + def format_alert( + self, + timezone: Optional[str] = None, + report_url: Optional[str] = None, + elementary_database_and_schema: Optional[str] = None, + global_suppression_interval: int = 0, + override_config: bool = False, + ) -> TestAlertModel: + return TestAlertModel( + id=self.id, + test_unique_id=self.test_unique_id, + elementary_unique_id=self.elementary_unique_id, + test_name=self.test_name, + severity=self.severity, + table_name=self.table_name, + test_type=self.test_type, + test_sub_type=self.test_sub_type, + test_results_description=self.test_results_description, + test_results_query=self.test_results_query, + test_short_name=self.test_short_name, + test_description=self.description, + other=self.other, + test_params=self.test_params, + test_rows_sample=self.test_rows_sample, + column_name=self.column_name, + alert_class_id=self.alert_class_id, + model_unique_id=self.model_unique_id, + detected_at=self.detected_at, + database_name=self.database_name, + schema_name=self.schema_name, + owners=self.unified_owners, + tags=self.tags, + subscribers=self.subscribers, + status=self.status, + model_meta=self.flatten_model_meta, + test_meta=self.flatten_test_meta, + suppression_interval=self.get_suppression_interval( + global_suppression_interval, override_config + ), + timezone=timezone, + report_url=report_url, + alert_fields=self.alert_fields, + elementary_database_and_schema=elementary_database_and_schema, + ) + + +class ModelAlertDataSchema(BaseAlertDataSchema): + alias: str + path: str + original_path: str + materialization: str + full_refresh: bool + message: str + resource_type: ResourceType = Field(ResourceType.MODEL, const=True) + + def format_alert( + self, + timezone: Optional[str] = None, + report_url: Optional[str] = None, + elementary_database_and_schema: Optional[str] = None, + global_suppression_interval: int = 0, + override_config: bool = False, + ) -> ModelAlertModel: + return ModelAlertModel( + id=self.id, + alias=self.alias, + path=self.path, + original_path=self.original_path, + materialization=self.materialization, + message=self.message, + full_refresh=self.full_refresh, + alert_class_id=self.alert_class_id, + model_unique_id=self.model_unique_id, + detected_at=self.detected_at, + database_name=self.database_name, + schema_name=self.schema_name, + owners=self.unified_owners, + tags=self.tags, + subscribers=self.subscribers, + status=self.status, + model_meta=self.flatten_model_meta, + suppression_interval=self.get_suppression_interval( + global_suppression_interval, override_config + ), + timezone=timezone, + report_url=report_url, + alert_fields=self.alert_fields, + elementary_database_and_schema=elementary_database_and_schema, + ) + + +class SourceFreshnessAlertDataSchema(BaseAlertDataSchema): + source_freshness_execution_id: str + snapshotted_at: Optional[datetime] = None + max_loaded_at: Optional[datetime] = None + max_loaded_at_time_ago_in_s: Optional[int] = None + source_name: str + identifier: str + error_after: Optional[str] = None + warn_after: Optional[str] = None + filter: Optional[str] = None + original_status: str + path: str + error: Optional[str] = None + freshness_description: Optional[str] = None + resource_type: ResourceType = Field(ResourceType.SOURCE_FRESHNESS, const=True) + + def format_alert( + self, + timezone: Optional[str] = None, + report_url: Optional[str] = None, + elementary_database_and_schema: Optional[str] = None, + global_suppression_interval: int = 0, + override_config: bool = False, + ) -> SourceFreshnessAlertModel: + return SourceFreshnessAlertModel( + id=self.id, + source_name=self.source_name, + identifier=self.identifier, + status=self.status, + original_status=self.original_status, + error_after=self.error_after, + warn_after=self.warn_after, + path=self.path, + error=self.error, + source_freshness_execution_id=self.source_freshness_execution_id, + snapshotted_at=self.snapshotted_at, + max_loaded_at=self.max_loaded_at, + max_loaded_at_time_ago_in_s=self.max_loaded_at_time_ago_in_s, + filter=self.filter, + freshness_description=self.freshness_description, + alert_class_id=self.alert_class_id, + model_unique_id=self.model_unique_id, + detected_at=self.detected_at, + database_name=self.database_name, + schema_name=self.schema_name, + owners=self.unified_owners, + tags=self.tags, + subscribers=self.subscribers, + model_meta=self.flatten_model_meta, + suppression_interval=self.get_suppression_interval( + global_suppression_interval, override_config + ), + timezone=timezone, + report_url=report_url, + alert_fields=self.alert_fields, + elementary_database_and_schema=elementary_database_and_schema, + ) diff --git a/elementary/monitor/fetchers/alerts/schema/pending_alerts.py b/elementary/monitor/fetchers/alerts/schema/pending_alerts.py index 6ef4e5c64..6b36f3318 100644 --- a/elementary/monitor/fetchers/alerts/schema/pending_alerts.py +++ b/elementary/monitor/fetchers/alerts/schema/pending_alerts.py @@ -1,19 +1,15 @@ from datetime import datetime from enum import Enum -from typing import Any, Dict, List, Optional, Union +from typing import Optional, Union -from pydantic import BaseModel, Field, validator +from pydantic import BaseModel, root_validator -from elementary.monitor.alerts.model_alert import ModelAlertModel -from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel -from elementary.monitor.alerts.test_alert import TestAlertModel -from elementary.monitor.data_monitoring.schema import ResourceType -from elementary.utils.dicts import flatten_dict_by_key, merge_dicts_attribute -from elementary.utils.json_utils import ( - try_load_json, - unpack_and_flatten_and_dedup_list_of_strings, - unpack_and_flatten_str_to_list, +from elementary.monitor.fetchers.alerts.schema.alert_data import ( + ModelAlertDataSchema, + SourceFreshnessAlertDataSchema, + TestAlertDataSchema, ) +from elementary.utils.json_utils import try_load_json ALERTS_CONFIG_KEY = "alerts_config" CHANNEL_KEY = "channel" @@ -45,314 +41,30 @@ class PendingAlertSchema(BaseModel): created_at: datetime updated_at: datetime status: AlertStatus - data: Dict[str, Any] + data: Union[ + TestAlertDataSchema, ModelAlertDataSchema, SourceFreshnessAlertDataSchema + ] sent_at: Optional[datetime] = None class Config: + smart_union = True # Make sure that serializing Enum return values use_enum_values = True - @validator("data", pre=True, always=True) - def validate_data(cls, data: Optional[Union[str, Dict]]): - return try_load_json(data) + @root_validator(pre=True) + def parse_data(cls, values: dict) -> dict: + new_values = {**values} + alert_type = AlertTypes(values.get("type")) + raw_data = try_load_json(values.get("data")) -class BasePendingAlertSchema(BaseModel): - id: str - alert_class_id: str - model_unique_id: Optional[str] = None - detected_at: datetime - database_name: Optional[str] = None - schema_name: str - tags: Optional[List[str]] = None - owners: Optional[List[str]] = None - model_meta: Optional[Dict] = None - status: str - - @property - def unified_meta(self) -> Dict: - return self.flatten_model_meta - - @property - def flatten_model_meta(self) -> Dict: - return self._flatten_meta(self.model_meta) - - @property - def alert_suppression_interval(self) -> Optional[int]: - return self.unified_meta.get(ALERT_SUPPRESSION_INTERVAL_KEY) - - @property - def group_alerts_by(self) -> Optional[str]: - return self.unified_meta.get(GROUP_ALERTS_BY_KEY) - - @property - def unified_owners(self) -> List[str]: - # Make sure we return both meta defined owners and config defined owners. - config_owners = self.owners or [] - meta_owners = self._get_alert_meta_attrs(OWNER_KEY) - return list(set(config_owners + meta_owners)) - - @property - def subscribers(self) -> List[str]: - return self._get_alert_meta_attrs(SUBSCRIBERS_KEY) - - @property - def description(self) -> Optional[str]: - return self.unified_meta.get(DESCRIPTION_KEY) - - @property - def alert_fields(self) -> List[str]: - return self.unified_meta.get(ALERT_FIELDS_KEY, []) - - @validator("model_meta", pre=True, always=True) - def validate_model_meta(cls, model_meta: Optional[Dict]) -> Dict: - return cls._validate_dict(model_meta) - - @validator("tags", pre=True, always=True) - def validate_tags(cls, tags: Optional[Union[List[str], str]]): - return unpack_and_flatten_and_dedup_list_of_strings(tags) - - @validator("owners", pre=True, always=True) - def validate_owners(cls, owners: Optional[Union[List[str], str]]): - return unpack_and_flatten_and_dedup_list_of_strings(owners) - - @staticmethod - def _flatten_meta(meta: Optional[Dict] = None) -> Dict: - return flatten_dict_by_key(meta, ALERTS_CONFIG_KEY) if meta else dict() - - def _get_alert_meta_attrs(self, meta_key: str) -> List[str]: - attrs: List[str] = merge_dicts_attribute( - dicts=[self.flatten_model_meta], attribute_key=meta_key - ) - return unpack_and_flatten_and_dedup_list_of_strings(attrs) - - def get_suppression_interval( - self, - interval_from_cli: int, - override_by_cli: bool = False, - ) -> int: - interval_from_alert = self.alert_suppression_interval - if interval_from_alert is None or override_by_cli: - return interval_from_cli - return interval_from_alert - - @staticmethod - def _validate_dict(value: Optional[Dict]) -> Dict: - if not value: - return dict() - return try_load_json(value) - - def format_alert( - self, - timezone: Optional[str] = None, - report_url: Optional[str] = None, - elementary_database_and_schema: Optional[str] = None, - global_suppression_interval: int = 0, - override_config: bool = False, - ): - raise NotImplementedError - - -class PendingTestAlertSchema(BasePendingAlertSchema): - test_unique_id: str - table_name: Optional[str] = None - column_name: Optional[str] = None - test_type: str - test_sub_type: str - test_results_description: str - test_results_query: Optional[str] = None - test_rows_sample: Optional[List[Dict]] = None - other: Optional[Dict] = None - test_name: str - test_short_name: str - test_params: Optional[Dict] = None - severity: str - test_meta: Optional[Dict] = None - elementary_unique_id: str - resource_type: ResourceType = Field(ResourceType.TEST, const=True) - - @property - def flatten_model_meta(self) -> Dict: - return self._flatten_meta(self.model_meta) - - @property - def flatten_test_meta(self) -> Dict: - return self._flatten_meta(self.test_meta) - - @property - def unified_meta(self) -> Dict: - return {**self.flatten_model_meta, **self.flatten_test_meta} - - @validator("test_rows_sample", pre=True, always=True) - def validate_test_rows_sample(cls, test_rows_sample): - if not test_rows_sample: - return [] - return unpack_and_flatten_str_to_list(test_rows_sample) - - @validator("test_params", pre=True, always=True) - def validate_test_params(cls, test_params: Optional[Dict]) -> Dict: - return cls._validate_dict(test_params) - - @validator("test_meta", pre=True, always=True) - def validate_test_meta(cls, test_meta: Optional[Dict]) -> Dict: - return cls._validate_dict(test_meta) - - @validator("other", pre=True, always=True) - def validate_other(cls, other: Optional[Dict]) -> Dict: - return cls._validate_dict(other) - - def _get_alert_meta_attrs(self, meta_key: str) -> List[str]: - attrs: List[str] = merge_dicts_attribute( - dicts=[self.flatten_model_meta, self.flatten_test_meta], - attribute_key=meta_key, - ) - return unpack_and_flatten_and_dedup_list_of_strings(attrs) - - def format_alert( - self, - timezone: Optional[str] = None, - report_url: Optional[str] = None, - elementary_database_and_schema: Optional[str] = None, - global_suppression_interval: int = 0, - override_config: bool = False, - ) -> TestAlertModel: - return TestAlertModel( - id=self.id, - test_unique_id=self.test_unique_id, - elementary_unique_id=self.elementary_unique_id, - test_name=self.test_name, - severity=self.severity, - table_name=self.table_name, - test_type=self.test_type, - test_sub_type=self.test_sub_type, - test_results_description=self.test_results_description, - test_results_query=self.test_results_query, - test_short_name=self.test_short_name, - test_description=self.description, - other=self.other, - test_params=self.test_params, - test_rows_sample=self.test_rows_sample, - column_name=self.column_name, - alert_class_id=self.alert_class_id, - model_unique_id=self.model_unique_id, - detected_at=self.detected_at, - database_name=self.database_name, - schema_name=self.schema_name, - owners=self.unified_owners, - tags=self.tags, - subscribers=self.subscribers, - status=self.status, - model_meta=self.flatten_model_meta, - test_meta=self.flatten_test_meta, - suppression_interval=self.get_suppression_interval( - global_suppression_interval, override_config - ), - timezone=timezone, - report_url=report_url, - alert_fields=self.alert_fields, - elementary_database_and_schema=elementary_database_and_schema, - ) - - -class PendingModelAlertSchema(BasePendingAlertSchema): - alias: str - path: str - original_path: str - materialization: str - full_refresh: bool - message: str - resource_type: ResourceType = Field(ResourceType.MODEL, const=True) - - def format_alert( - self, - timezone: Optional[str] = None, - report_url: Optional[str] = None, - elementary_database_and_schema: Optional[str] = None, - global_suppression_interval: int = 0, - override_config: bool = False, - ) -> ModelAlertModel: - return ModelAlertModel( - id=self.id, - alias=self.alias, - path=self.path, - original_path=self.original_path, - materialization=self.materialization, - message=self.message, - full_refresh=self.full_refresh, - alert_class_id=self.alert_class_id, - model_unique_id=self.model_unique_id, - detected_at=self.detected_at, - database_name=self.database_name, - schema_name=self.schema_name, - owners=self.unified_owners, - tags=self.tags, - subscribers=self.subscribers, - status=self.status, - model_meta=self.flatten_model_meta, - suppression_interval=self.get_suppression_interval( - global_suppression_interval, override_config - ), - timezone=timezone, - report_url=report_url, - alert_fields=self.alert_fields, - elementary_database_and_schema=elementary_database_and_schema, - ) - - -class PendingSourceFreshnessAlertSchema(BasePendingAlertSchema): - source_freshness_execution_id: str - snapshotted_at: Optional[datetime] = None - max_loaded_at: Optional[datetime] = None - max_loaded_at_time_ago_in_s: Optional[int] = None - source_name: str - identifier: str - error_after: Optional[str] = None - warn_after: Optional[str] = None - filter: Optional[str] = None - original_status: str - path: str - error: Optional[str] = None - freshness_description: Optional[str] = None - resource_type: ResourceType = Field(ResourceType.SOURCE_FRESHNESS, const=True) + data = None + if alert_type is AlertTypes.TEST: + data = TestAlertDataSchema(**raw_data) + elif alert_type is AlertTypes.MODEL: + data = ModelAlertDataSchema(**raw_data) # type: ignore[assignment] + elif alert_type is AlertTypes.SOURCE_FRESHNESS: + data = SourceFreshnessAlertDataSchema(**raw_data) # type: ignore[assignment] - def format_alert( - self, - timezone: Optional[str] = None, - report_url: Optional[str] = None, - elementary_database_and_schema: Optional[str] = None, - global_suppression_interval: int = 0, - override_config: bool = False, - ) -> SourceFreshnessAlertModel: - return SourceFreshnessAlertModel( - id=self.id, - source_name=self.source_name, - identifier=self.identifier, - status=self.status, - original_status=self.original_status, - error_after=self.error_after, - warn_after=self.warn_after, - path=self.path, - error=self.error, - source_freshness_execution_id=self.source_freshness_execution_id, - snapshotted_at=self.snapshotted_at, - max_loaded_at=self.max_loaded_at, - max_loaded_at_time_ago_in_s=self.max_loaded_at_time_ago_in_s, - filter=self.filter, - freshness_description=self.freshness_description, - alert_class_id=self.alert_class_id, - model_unique_id=self.model_unique_id, - detected_at=self.detected_at, - database_name=self.database_name, - schema_name=self.schema_name, - owners=self.unified_owners, - tags=self.tags, - subscribers=self.subscribers, - model_meta=self.flatten_model_meta, - suppression_interval=self.get_suppression_interval( - global_suppression_interval, override_config - ), - timezone=timezone, - report_url=report_url, - alert_fields=self.alert_fields, - elementary_database_and_schema=elementary_database_and_schema, - ) + new_values["data"] = data + return new_values diff --git a/tests/mocks/fetchers/alerts_fetcher_mock.py b/tests/mocks/fetchers/alerts_fetcher_mock.py index c5051b670..33b0737b6 100644 --- a/tests/mocks/fetchers/alerts_fetcher_mock.py +++ b/tests/mocks/fetchers/alerts_fetcher_mock.py @@ -1,11 +1,11 @@ from datetime import datetime, timedelta +from typing import List from elementary.config.config import Config from elementary.monitor.fetchers.alerts.alerts import AlertsFetcher from elementary.monitor.fetchers.alerts.schema.pending_alerts import ( - PendingModelAlertSchema, - PendingSourceFreshnessAlertSchema, - PendingTestAlertSchema, + AlertTypes, + PendingAlertSchema, ) from elementary.utils.time import DATETIME_FORMAT from tests.mocks.dbt_runner_mock import MockDbtRunner @@ -22,7 +22,7 @@ def __init__(self): mock_dbt_runner, config, elementary_database_and_schema="test.test" ) - def query_pending_test_alerts(self, *args, **kwargs): + def query_pending_alerts(self, *args, **kwargs) -> List[PendingAlertSchema]: PENDDING_TEST_ALERTS_MOCK_DATA = [ # Alert within suppression interval dict( @@ -183,13 +183,6 @@ def query_pending_test_alerts(self, *args, **kwargs): ), ] - pending_test_alerts = [ - PendingTestAlertSchema(**pending_alert) - for pending_alert in PENDDING_TEST_ALERTS_MOCK_DATA - ] - return pending_test_alerts - - def query_pending_model_alerts(self, *args, **kwargs): PENDDING_MODEL_ALERTS_MOCK_DATA = [ # Alert within suppression interval dict( @@ -300,13 +293,6 @@ def query_pending_model_alerts(self, *args, **kwargs): ), ] - pending_model_alerts = [ - PendingModelAlertSchema(**pending_alert) - for pending_alert in PENDDING_MODEL_ALERTS_MOCK_DATA - ] - return pending_model_alerts - - def query_pending_source_freshness_alerts(self, *args, **kwargs): PENDDING_SOURCE_FRESHNESS_ALERTS_MOCK_DATA = [ # Alert within suppression interval dict( @@ -447,13 +433,54 @@ def query_pending_source_freshness_alerts(self, *args, **kwargs): ), ] + pending_test_alerts = [ + PendingAlertSchema( + id=pending_alert["id"], + alert_class_id=pending_alert["alert_class_id"], + type=AlertTypes.TEST, + detected_at=pending_alert["detected_at"], + created_at=pending_alert["detected_at"], + updated_at=pending_alert["detected_at"], + status="pending", + data=pending_alert, + ) + for pending_alert in PENDDING_TEST_ALERTS_MOCK_DATA + ] + + pending_model_alerts = [ + PendingAlertSchema( + id=pending_alert["id"], + alert_class_id=pending_alert["alert_class_id"], + type=AlertTypes.MODEL, + detected_at=pending_alert["detected_at"], + created_at=pending_alert["detected_at"], + updated_at=pending_alert["detected_at"], + status="pending", + data=pending_alert, + ) + for pending_alert in PENDDING_MODEL_ALERTS_MOCK_DATA + ] + pending_source_freshness_alerts = [ - PendingSourceFreshnessAlertSchema(**pending_alert) + PendingAlertSchema( + id=pending_alert["id"], + alert_class_id=pending_alert["alert_class_id"], + type=AlertTypes.SOURCE_FRESHNESS, + detected_at=pending_alert["detected_at"], + created_at=pending_alert["detected_at"], + updated_at=pending_alert["detected_at"], + status="pending", + data=pending_alert, + ) for pending_alert in PENDDING_SOURCE_FRESHNESS_ALERTS_MOCK_DATA ] - return pending_source_freshness_alerts + return [ + *pending_test_alerts, + *pending_model_alerts, + *pending_source_freshness_alerts, + ] - def query_last_test_alert_times(self, *args, **kwargs): + def query_last_alert_times(self, *args, **kwargs): return { "test_id_1.column.generic": ( CURRENT_DATETIME_UTC - timedelta(hours=1.5) @@ -464,30 +491,22 @@ def query_last_test_alert_times(self, *args, **kwargs): "test_id_4.column.generic": ( CURRENT_DATETIME_UTC - timedelta(hours=1.5) ).strftime(DATETIME_FORMAT), - } - - def query_last_model_alert_times(self, *args, **kwargs): - return dict( - model_id_1=(CURRENT_DATETIME_UTC - timedelta(hours=1.5)).strftime( + "model_id_1": (CURRENT_DATETIME_UTC - timedelta(hours=1.5)).strftime( DATETIME_FORMAT ), - model_id_2=(CURRENT_DATETIME_UTC - timedelta(minutes=1)).strftime( + "model_id_2": (CURRENT_DATETIME_UTC - timedelta(minutes=1)).strftime( DATETIME_FORMAT ), - model_id_4=(CURRENT_DATETIME_UTC - timedelta(hours=1.5)).strftime( + "model_id_4": (CURRENT_DATETIME_UTC - timedelta(hours=1.5)).strftime( DATETIME_FORMAT ), - ) - - def query_last_source_freshness_alert_times(self, *args, **kwargs): - return dict( - source_id_1=(CURRENT_DATETIME_UTC - timedelta(hours=1.5)).strftime( + "source_id_1": (CURRENT_DATETIME_UTC - timedelta(hours=1.5)).strftime( DATETIME_FORMAT ), - source_id_2=(CURRENT_DATETIME_UTC - timedelta(minutes=1)).strftime( + "source_id_2": (CURRENT_DATETIME_UTC - timedelta(minutes=1)).strftime( DATETIME_FORMAT ), - source_id_4=(CURRENT_DATETIME_UTC - timedelta(hours=1.5)).strftime( + "source_id_4": (CURRENT_DATETIME_UTC - timedelta(hours=1.5)).strftime( DATETIME_FORMAT ), - ) + } diff --git a/tests/unit/monitor/api/alerts/test_alert_filters.py b/tests/unit/monitor/api/alerts/test_alert_filters.py index c2bef3038..2df3f7e8c 100644 --- a/tests/unit/monitor/api/alerts/test_alert_filters.py +++ b/tests/unit/monitor/api/alerts/test_alert_filters.py @@ -18,257 +18,346 @@ SupportedFilterTypes, ) from elementary.monitor.fetchers.alerts.schema.pending_alerts import ( - PendingModelAlertSchema, - PendingSourceFreshnessAlertSchema, - PendingTestAlertSchema, + AlertTypes, + PendingAlertSchema, ) def initial_alerts(): test_alerts = [ - PendingTestAlertSchema( + PendingAlertSchema( id="1", alert_class_id="test_id_1", - model_unique_id="elementary.model_id_1", - test_unique_id="test_id_1", - test_name="test_1", - test_created_at="2022-10-10 10:10:10", - tags='["one", "two"]', - model_meta=dict(owner='["jeff", "john"]'), - status="fail", - elementary_unique_id="elementary.model_id_1.test_id_1.9cf2f5f6ad.None.generic", + type=AlertTypes.TEST, detected_at="2022-10-10 10:00:00", - database_name="test_db", - schema_name="test_schema", - table_name="table", - suppression_status="pending", - test_type="dbt_test", - test_sub_type="generic", - test_results_description="a mock alert", - test_results_query="select * from table", - test_short_name="test_1", - severity="ERROR", + created_at="2022-10-10 10:00:00", + updated_at="2022-10-10 10:00:00", + status="pending", + data=dict( + id="1", + alert_class_id="test_id_1", + model_unique_id="elementary.model_id_1", + test_unique_id="test_id_1", + test_name="test_1", + test_created_at="2022-10-10 10:10:10", + tags='["one", "two"]', + model_meta=dict(owner='["jeff", "john"]'), + status="fail", + elementary_unique_id="elementary.model_id_1.test_id_1.9cf2f5f6ad.None.generic", + detected_at="2022-10-10 10:00:00", + database_name="test_db", + schema_name="test_schema", + table_name="table", + suppression_status="pending", + test_type="dbt_test", + test_sub_type="generic", + test_results_description="a mock alert", + test_results_query="select * from table", + test_short_name="test_1", + severity="ERROR", + ), ), - PendingTestAlertSchema( + PendingAlertSchema( id="2", alert_class_id="test_id_2", - model_unique_id="elementary.model_id_1", - test_unique_id="test_id_2", - test_name="test_2", - test_created_at="2022-10-10 09:10:10", - tags='["three"]', - model_meta=dict(owner='["jeff", "john"]'), - status="fail", - elementary_unique_id="elementary.model_id_1.test_id_2.9cf2f5f6ad.None.generic", + type=AlertTypes.TEST, detected_at="2022-10-10 10:00:00", - database_name="test_db", - schema_name="test_schema", - table_name="table", - suppression_status="pending", - test_type="dbt_test", - test_sub_type="generic", - test_results_description="a mock alert", - test_results_query="select * from table", - test_short_name="test_2", - severity="ERROR", + created_at="2022-10-10 10:00:00", + updated_at="2022-10-10 10:00:00", + status="pending", + data=dict( + id="2", + alert_class_id="test_id_2", + model_unique_id="elementary.model_id_1", + test_unique_id="test_id_2", + test_name="test_2", + test_created_at="2022-10-10 09:10:10", + tags='["three"]', + model_meta=dict(owner='["jeff", "john"]'), + status="fail", + elementary_unique_id="elementary.model_id_1.test_id_2.9cf2f5f6ad.None.generic", + detected_at="2022-10-10 10:00:00", + database_name="test_db", + schema_name="test_schema", + table_name="table", + suppression_status="pending", + test_type="dbt_test", + test_sub_type="generic", + test_results_description="a mock alert", + test_results_query="select * from table", + test_short_name="test_2", + severity="ERROR", + ), ), - PendingTestAlertSchema( + PendingAlertSchema( id="3", alert_class_id="test_id_3", - model_unique_id="elementary.model_id_2", - test_unique_id="test_id_3", - test_name="test_3", - test_created_at="2022-10-10 10:10:10", - # invalid tag - tags="one", - model_meta=dict(owner='["john"]'), - status="fail", - elementary_unique_id="elementary.model_id_1.test_id_3.9cf2f5f6ad.None.generic", + type=AlertTypes.TEST, detected_at="2022-10-10 10:00:00", - database_name="test_db", - schema_name="test_schema", - table_name="table", - suppression_status="pending", - test_type="dbt_test", - test_sub_type="generic", - test_results_description="a mock alert", - test_results_query="select * from table", - test_short_name="test_3", - severity="ERROR", + created_at="2022-10-10 10:00:00", + updated_at="2022-10-10 10:00:00", + status="pending", + data=dict( + id="3", + alert_class_id="test_id_3", + model_unique_id="elementary.model_id_2", + test_unique_id="test_id_3", + test_name="test_3", + test_created_at="2022-10-10 10:10:10", + # invalid tag + tags="one", + model_meta=dict(owner='["john"]'), + status="fail", + elementary_unique_id="elementary.model_id_1.test_id_3.9cf2f5f6ad.None.generic", + detected_at="2022-10-10 10:00:00", + database_name="test_db", + schema_name="test_schema", + table_name="table", + suppression_status="pending", + test_type="dbt_test", + test_sub_type="generic", + test_results_description="a mock alert", + test_results_query="select * from table", + test_short_name="test_3", + severity="ERROR", + ), ), - PendingTestAlertSchema( + PendingAlertSchema( id="4", alert_class_id="test_id_4", - model_unique_id="elementary.model_id_2", - test_unique_id="test_id_4", - test_name="test_4", - test_created_at="2022-10-10 09:10:10", - tags='["three", "four"]', - model_meta=dict(owner='["jeff"]'), - status="warn", - elementary_unique_id="elementary.model_id_1.test_id_4.9cf2f5f6ad.None.generic", + type=AlertTypes.TEST, detected_at="2022-10-10 10:00:00", - database_name="test_db", - schema_name="test_schema", - table_name="table", - suppression_status="pending", - test_type="dbt_test", - test_sub_type="generic", - test_results_description="a mock alert", - test_results_query="select * from table", - test_short_name="test_4", - severity="ERROR", + created_at="2022-10-10 10:00:00", + updated_at="2022-10-10 10:00:00", + status="pending", + data=dict( + id="4", + alert_class_id="test_id_4", + model_unique_id="elementary.model_id_2", + test_unique_id="test_id_4", + test_name="test_4", + test_created_at="2022-10-10 09:10:10", + tags='["three", "four"]', + model_meta=dict(owner='["jeff"]'), + status="warn", + elementary_unique_id="elementary.model_id_1.test_id_4.9cf2f5f6ad.None.generic", + detected_at="2022-10-10 10:00:00", + database_name="test_db", + schema_name="test_schema", + table_name="table", + suppression_status="pending", + test_type="dbt_test", + test_sub_type="generic", + test_results_description="a mock alert", + test_results_query="select * from table", + test_short_name="test_4", + severity="ERROR", + ), ), ] model_alerts = [ - PendingModelAlertSchema( + PendingAlertSchema( id="1", alert_class_id="elementary.model_id_1", - model_unique_id="elementary.model_id_1", - alias="modely", - path="my/path", - original_path="", - materialization="table", - message="", - full_refresh=False, + type=AlertTypes.MODEL, detected_at="2022-10-10 10:00:00", - alert_suppression_interval=0, - tags='["one", "two"]', - model_meta=dict(owner='["jeff", "john"]'), - status="error", - database_name="test_db", - schema_name="test_schema", - suppression_status="pending", + created_at="2022-10-10 10:00:00", + updated_at="2022-10-10 10:00:00", + status="pending", + data=dict( + id="1", + alert_class_id="elementary.model_id_1", + model_unique_id="elementary.model_id_1", + alias="modely", + path="my/path", + original_path="", + materialization="table", + message="", + full_refresh=False, + detected_at="2022-10-10 10:00:00", + alert_suppression_interval=0, + tags='["one", "two"]', + model_meta=dict(owner='["jeff", "john"]'), + status="error", + database_name="test_db", + schema_name="test_schema", + suppression_status="pending", + ), ), - PendingModelAlertSchema( + PendingAlertSchema( id="2", alert_class_id="elementary.model_id_1", - model_unique_id="elementary.model_id_1", - alias="modely", - path="my/path", - original_path="", - materialization="table", - message="", - full_refresh=False, - detected_at="2022-10-10 09:00:00", - alert_suppression_interval=3, - tags='["three"]', - model_meta=dict(owner='["john"]'), - status="error", - database_name="test_db", - schema_name="test_schema", - suppression_status="pending", + type=AlertTypes.MODEL, + detected_at="2022-10-10 10:00:00", + created_at="2022-10-10 10:00:00", + updated_at="2022-10-10 10:00:00", + status="pending", + data=dict( + id="2", + alert_class_id="elementary.model_id_1", + model_unique_id="elementary.model_id_1", + alias="modely", + path="my/path", + original_path="", + materialization="table", + message="", + full_refresh=False, + detected_at="2022-10-10 09:00:00", + alert_suppression_interval=3, + tags='["three"]', + model_meta=dict(owner='["john"]'), + status="error", + database_name="test_db", + schema_name="test_schema", + suppression_status="pending", + ), ), - PendingModelAlertSchema( + PendingAlertSchema( id="3", alert_class_id="elementary.model_id_2", - model_unique_id="elementary.model_id_2", - alias="model2", - path="my/path2", - original_path="", - materialization="table", - message="", - full_refresh=False, + type=AlertTypes.MODEL, detected_at="2022-10-10 08:00:00", - alert_suppression_interval=1, - tags='["three", "four"]', - model_meta=dict(owner='["jeff"]'), - status="skipped", - database_name="test_db", - schema_name="test_schema", - suppression_status="pending", + created_at="2022-10-10 08:00:00", + updated_at="2022-10-10 08:00:00", + status="pending", + data=dict( + id="3", + alert_class_id="elementary.model_id_2", + model_unique_id="elementary.model_id_2", + alias="model2", + path="my/path2", + original_path="", + materialization="table", + message="", + full_refresh=False, + detected_at="2022-10-10 08:00:00", + alert_suppression_interval=1, + tags='["three", "four"]', + model_meta=dict(owner='["jeff"]'), + status="skipped", + database_name="test_db", + schema_name="test_schema", + suppression_status="pending", + ), ), ] source_freshness_alerts = [ - PendingSourceFreshnessAlertSchema( + PendingAlertSchema( id="1", - source_freshness_execution_id="1", alert_class_id="elementary.model_id_1", - model_unique_id="elementary.model_id_1", - alias="modely", - path="my/path", - original_path="", - materialization="table", - message="", - full_refresh=False, - detected_at="2022-10-10 10:00:00", - alert_suppression_interval=0, - tags='["one", "two"]', - model_meta=dict(owner='["jeff", "john"]'), - original_status="error", - status="fail", - snapshotted_at="2023-08-15T12:26:06.884065+00:00", - max_loaded_at="1969-12-31T00:00:00+00:00", - max_loaded_at_time_ago_in_s=1692188766.884065, - source_name="elementary_integration_tests", - identifier="any_type_column_anomalies_validation", - error_after='{"count": null, "period": null}', - warn_after='{"count": 1, "period": "minute"}', - filter="null", - error="problemz", - database_name="test_db", - schema_name="test_schema", - suppression_status="pending", + type=AlertTypes.SOURCE_FRESHNESS, + detected_at="2022-10-10 08:00:00", + created_at="2022-10-10 08:00:00", + updated_at="2022-10-10 08:00:00", + status="pending", + data=dict( + id="1", + source_freshness_execution_id="1", + alert_class_id="elementary.model_id_1", + model_unique_id="elementary.model_id_1", + alias="modely", + path="my/path", + original_path="", + materialization="table", + message="", + full_refresh=False, + detected_at="2022-10-10 10:00:00", + alert_suppression_interval=0, + tags='["one", "two"]', + model_meta=dict(owner='["jeff", "john"]'), + original_status="error", + status="fail", + snapshotted_at="2023-08-15T12:26:06.884065+00:00", + max_loaded_at="1969-12-31T00:00:00+00:00", + max_loaded_at_time_ago_in_s=1692188766.884065, + source_name="elementary_integration_tests", + identifier="any_type_column_anomalies_validation", + error_after='{"count": null, "period": null}', + warn_after='{"count": 1, "period": "minute"}', + filter="null", + error="problemz", + database_name="test_db", + schema_name="test_schema", + suppression_status="pending", + ), ), - PendingSourceFreshnessAlertSchema( + PendingAlertSchema( id="2", - source_freshness_execution_id="2", alert_class_id="elementary.model_id_2", - model_unique_id="elementary.model_id_2", - alias="modely", - path="my/path", - original_path="", - materialization="table", - message="", - full_refresh=False, - detected_at="2022-10-10 10:00:00", - alert_suppression_interval=0, - tags='["one", "two"]', - model_meta=dict(owner='["jeff", "john"]'), - status="warn", - original_status="warn", - snapshotted_at="2023-08-15T12:26:06.884065+00:00", - max_loaded_at="1969-12-31T00:00:00+00:00", - max_loaded_at_time_ago_in_s=1692188766.884065, - source_name="elementary_integration_tests", - identifier="any_type_column_anomalies_validation", - error_after='{"count": null, "period": null}', - warn_after='{"count": 1, "period": "minute"}', - filter="null", - error="problemz", - database_name="test_db", - schema_name="test_schema", - suppression_status="pending", + type=AlertTypes.SOURCE_FRESHNESS, + detected_at="2022-10-10 08:00:00", + created_at="2022-10-10 08:00:00", + updated_at="2022-10-10 08:00:00", + status="pending", + data=dict( + id="2", + source_freshness_execution_id="2", + alert_class_id="elementary.model_id_2", + model_unique_id="elementary.model_id_2", + alias="modely", + path="my/path", + original_path="", + materialization="table", + message="", + full_refresh=False, + detected_at="2022-10-10 10:00:00", + alert_suppression_interval=0, + tags='["one", "two"]', + model_meta=dict(owner='["jeff", "john"]'), + status="warn", + original_status="warn", + snapshotted_at="2023-08-15T12:26:06.884065+00:00", + max_loaded_at="1969-12-31T00:00:00+00:00", + max_loaded_at_time_ago_in_s=1692188766.884065, + source_name="elementary_integration_tests", + identifier="any_type_column_anomalies_validation", + error_after='{"count": null, "period": null}', + warn_after='{"count": 1, "period": "minute"}', + filter="null", + error="problemz", + database_name="test_db", + schema_name="test_schema", + suppression_status="pending", + ), ), - PendingSourceFreshnessAlertSchema( + PendingAlertSchema( id="3", - source_freshness_execution_id="3", alert_class_id="elementary.model_id_3", - model_unique_id="elementary.model_id_3", - alias="modely", - path="my/path", - original_path="", - materialization="table", - message="", - full_refresh=False, - detected_at="2022-10-10 10:00:00", - alert_suppression_interval=0, - tags='["one", "two"]', - model_meta=dict(owner='["jeff", "john"]'), - original_status="runtime error", - status="error", - snapshotted_at="2023-08-15T12:26:06.884065+00:00", - max_loaded_at="1969-12-31T00:00:00+00:00", - max_loaded_at_time_ago_in_s=1692188766.884065, - source_name="elementary_integration_tests", - identifier="any_type_column_anomalies_validation", - error_after='{"count": null, "period": null}', - warn_after='{"count": 1, "period": "minute"}', - filter="null", - error="problemz", - database_name="test_db", - schema_name="test_schema", - suppression_status="pending", + type=AlertTypes.SOURCE_FRESHNESS, + detected_at="2022-10-10 08:00:00", + created_at="2022-10-10 08:00:00", + updated_at="2022-10-10 08:00:00", + status="pending", + data=dict( + id="3", + source_freshness_execution_id="3", + alert_class_id="elementary.model_id_3", + model_unique_id="elementary.model_id_3", + alias="modely", + path="my/path", + original_path="", + materialization="table", + message="", + full_refresh=False, + detected_at="2022-10-10 10:00:00", + alert_suppression_interval=0, + tags='["one", "two"]', + model_meta=dict(owner='["jeff", "john"]'), + original_status="runtime error", + status="error", + snapshotted_at="2023-08-15T12:26:06.884065+00:00", + max_loaded_at="1969-12-31T00:00:00+00:00", + max_loaded_at_time_ago_in_s=1692188766.884065, + source_name="elementary_integration_tests", + identifier="any_type_column_anomalies_validation", + error_after='{"count": null, "period": null}', + warn_after='{"count": 1, "period": "minute"}', + filter="null", + error="problemz", + database_name="test_db", + schema_name="test_schema", + suppression_status="pending", + ), ), ] return test_alerts, model_alerts, source_freshness_alerts diff --git a/tests/unit/monitor/api/alerts/test_alerts_api.py b/tests/unit/monitor/api/alerts/test_alerts_api.py index a4bb58b69..b9307e448 100644 --- a/tests/unit/monitor/api/alerts/test_alerts_api.py +++ b/tests/unit/monitor/api/alerts/test_alerts_api.py @@ -6,23 +6,19 @@ def test_get_suppressed_alerts(alerts_api_mock: MockAlertsAPI): - last_test_alert_sent_times = ( - alerts_api_mock.alerts_fetcher.query_last_test_alert_times() - ) - last_model_alert_sent_times = ( - alerts_api_mock.alerts_fetcher.query_last_model_alert_times() - ) + last_alert_sent_times = alerts_api_mock.alerts_fetcher.query_last_alert_times() - test_alerts = alerts_api_mock.alerts_fetcher.query_pending_test_alerts() - model_alerts = alerts_api_mock.alerts_fetcher.query_pending_model_alerts() + alerts = alerts_api_mock.alerts_fetcher.query_pending_alerts() + test_alerts = [alert for alert in alerts if alert.type == "test"] + model_alerts = [alert for alert in alerts if alert.type == "model"] suppressed_test_alerts = alerts_api_mock._get_suppressed_alerts( test_alerts, - last_test_alert_sent_times, + last_alert_sent_times, ) suppressed_model_alerts = alerts_api_mock._get_suppressed_alerts( model_alerts, - last_model_alert_sent_times, + last_alert_sent_times, ) assert json.dumps(suppressed_test_alerts, sort_keys=True) == json.dumps( @@ -34,23 +30,19 @@ def test_get_suppressed_alerts(alerts_api_mock: MockAlertsAPI): def test_sort_alerts(alerts_api_mock: MockAlertsAPI): - last_test_alert_sent_times = ( - alerts_api_mock.alerts_fetcher.query_last_test_alert_times() - ) - last_model_alert_sent_times = ( - alerts_api_mock.alerts_fetcher.query_last_model_alert_times() - ) + last_alert_sent_times = alerts_api_mock.alerts_fetcher.query_last_alert_times() - test_alerts = alerts_api_mock.alerts_fetcher.query_pending_test_alerts() - model_alerts = alerts_api_mock.alerts_fetcher.query_pending_model_alerts() + alerts = alerts_api_mock.alerts_fetcher.query_pending_alerts() + test_alerts = [alert for alert in alerts if alert.type == "test"] + model_alerts = [alert for alert in alerts if alert.type == "model"] sorted_test_alerts = alerts_api_mock._sort_alerts( test_alerts, - last_test_alert_sent_times, + last_alert_sent_times, ) sorted_model_alerts = alerts_api_mock._sort_alerts( model_alerts, - last_model_alert_sent_times, + last_alert_sent_times, ) # alert_id_1 is suppressed and alert_id_5 is duplicated @@ -76,8 +68,9 @@ def test_sort_alerts(alerts_api_mock: MockAlertsAPI): def test_get_latest_alerts(alerts_api_mock: MockAlertsAPI): - test_alerts = alerts_api_mock.alerts_fetcher.query_pending_test_alerts() - model_alerts = alerts_api_mock.alerts_fetcher.query_pending_model_alerts() + alerts = alerts_api_mock.alerts_fetcher.query_pending_alerts() + test_alerts = [alert for alert in alerts if alert.type == "test"] + model_alerts = [alert for alert in alerts if alert.type == "model"] latest_test_alerts = alerts_api_mock._get_latest_alerts(test_alerts) latest_model_alerts = alerts_api_mock._get_latest_alerts(model_alerts) diff --git a/tests/unit/monitor/data_monitoring/alerts/test_data_monitoring_alerts.py b/tests/unit/monitor/data_monitoring/alerts/test_data_monitoring_alerts.py index 7a3a595b0..1483c2130 100644 --- a/tests/unit/monitor/data_monitoring/alerts/test_data_monitoring_alerts.py +++ b/tests/unit/monitor/data_monitoring/alerts/test_data_monitoring_alerts.py @@ -5,7 +5,7 @@ from elementary.monitor.alerts.model_alert import ModelAlertModel from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel from elementary.monitor.alerts.test_alert import TestAlertModel -from elementary.monitor.api.alerts.schema import AlertsSchema +from elementary.monitor.api.alerts.schema import SortedAlertsSchema from elementary.monitor.data_monitoring.alerts.integrations.slack.slack import ( SlackIntegration, ) @@ -23,7 +23,7 @@ def test_get_integration_client(data_monitoring_alerts_mock: DataMonitoringAlert def test_fetch_data(data_monitoring_alerts_mock: DataMonitoringAlertsMock): mock_alerts_api = MockAlertsAPI() alerts_data = data_monitoring_alerts_mock._fetch_data(days_back=1) - assert isinstance(alerts_data, AlertsSchema) + assert isinstance(alerts_data, SortedAlertsSchema) assert alerts_data.json(sort_keys=True) == mock_alerts_api.get_new_alerts( days_back=1 ).json(sort_keys=True) @@ -31,7 +31,7 @@ def test_fetch_data(data_monitoring_alerts_mock: DataMonitoringAlertsMock): def test_format_alerts(data_monitoring_alerts_mock: DataMonitoringAlertsMock): alerts = data_monitoring_alerts_mock._fetch_data(days_back=1) - formatted_alerts = data_monitoring_alerts_mock._format_alerts(alerts) + formatted_alerts = data_monitoring_alerts_mock._format_alerts(alerts.send) test_alerts = [ alert for alert in formatted_alerts if isinstance(alert, TestAlertModel) ] @@ -55,7 +55,7 @@ def test_format_alerts(data_monitoring_alerts_mock: DataMonitoringAlertsMock): # test format group by table data_monitoring_alerts_mock.config.slack_group_alerts_by = "table" - formatted_alerts = data_monitoring_alerts_mock._format_alerts(alerts) + formatted_alerts = data_monitoring_alerts_mock._format_alerts(alerts.send) sorted_formatted_alerts = sorted( formatted_alerts, key=lambda alert: alert.model_unique_id ) diff --git a/tests/unit/monitor/fetchers/alerts/schemas/__init__.py b/tests/unit/monitor/fetchers/alerts/schemas/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/monitor/fetchers/alerts/test_alerts_schema.py b/tests/unit/monitor/fetchers/alerts/schemas/test_alert_data_schema.py similarity index 75% rename from tests/unit/monitor/fetchers/alerts/test_alerts_schema.py rename to tests/unit/monitor/fetchers/alerts/schemas/test_alert_data_schema.py index 514fdc8d2..b66721e22 100644 --- a/tests/unit/monitor/fetchers/alerts/test_alerts_schema.py +++ b/tests/unit/monitor/fetchers/alerts/schemas/test_alert_data_schema.py @@ -3,9 +3,9 @@ import pytest -from elementary.monitor.fetchers.alerts.schema.pending_alerts import ( - BasePendingAlertSchema, - PendingTestAlertSchema, +from elementary.monitor.fetchers.alerts.schema.alert_data import ( + BaseAlertDataSchema, + TestAlertDataSchema, ) from elementary.utils.time import DATETIME_FORMAT @@ -40,16 +40,16 @@ def test_flatten_meta(): - flatten_meta = BasePendingAlertSchema._flatten_meta() + flatten_meta = BaseAlertDataSchema._flatten_meta() assert isinstance(flatten_meta, dict) assert len(flatten_meta) == 0 - flatten_meta = BasePendingAlertSchema._flatten_meta(dict(a="a")) + flatten_meta = BaseAlertDataSchema._flatten_meta(dict(a="a")) assert json.dumps(flatten_meta, sort_keys=True) == json.dumps( dict(a="a"), sort_keys=True ) - flatten_meta = BasePendingAlertSchema._flatten_meta( + flatten_meta = BaseAlertDataSchema._flatten_meta( dict(a="a", alerts_config=dict(a="b")) ) assert json.dumps(flatten_meta, sort_keys=True) == json.dumps( @@ -71,9 +71,9 @@ def test_flatten_meta(): ) def test_get_alert_meta_attrs(model_meta, test_meta, expected): alert = ( - BasePendingAlertSchema(**{**BASE_ALERT, "model_meta": model_meta}) + BaseAlertDataSchema(**{**BASE_ALERT, "model_meta": model_meta}) if test_meta is None - else PendingTestAlertSchema( + else TestAlertDataSchema( **{**TEST_ALERT, "model_meta": model_meta, "test_meta": test_meta} ) ) @@ -81,7 +81,7 @@ def test_get_alert_meta_attrs(model_meta, test_meta, expected): def test_get_suppression_interval(): - base_alert = BasePendingAlertSchema( + base_alert = BaseAlertDataSchema( **{**BASE_ALERT, "model_meta": dict(alert_suppression_interval=1)} ) assert ( @@ -93,7 +93,7 @@ def test_get_suppression_interval(): == 2 ) - base_alert = BasePendingAlertSchema(**BASE_ALERT) + base_alert = BaseAlertDataSchema(**BASE_ALERT) assert ( base_alert.get_suppression_interval(interval_from_cli=2, override_by_cli=False) == 2 @@ -105,17 +105,17 @@ def test_get_suppression_interval(): def test_tags(): - base_alert = BasePendingAlertSchema(**BASE_ALERT) + base_alert = BaseAlertDataSchema(**BASE_ALERT) assert base_alert.tags == [] - base_alert = BasePendingAlertSchema(**{**BASE_ALERT, "tags": ["a", "b"]}) + base_alert = BaseAlertDataSchema(**{**BASE_ALERT, "tags": ["a", "b"]}) assert sorted(base_alert.tags) == sorted(["a", "b"]) - base_alert = BasePendingAlertSchema(**{**BASE_ALERT, "tags": '["a", "b"]'}) + base_alert = BaseAlertDataSchema(**{**BASE_ALERT, "tags": '["a", "b"]'}) assert sorted(base_alert.tags) == sorted(["a", "b"]) - base_alert = BasePendingAlertSchema(**{**BASE_ALERT, "tags": "a, b"}) + base_alert = BaseAlertDataSchema(**{**BASE_ALERT, "tags": "a, b"}) assert sorted(base_alert.tags) == sorted(["a", "b"]) - base_alert = BasePendingAlertSchema(**{**BASE_ALERT, "tags": "a"}) + base_alert = BaseAlertDataSchema(**{**BASE_ALERT, "tags": "a"}) assert sorted(base_alert.tags) == sorted(["a"]) diff --git a/tests/unit/monitor/fetchers/alerts/test_alerts_fetcher.py b/tests/unit/monitor/fetchers/alerts/test_alerts_fetcher.py index 513f7b3c6..fb9dec9bb 100644 --- a/tests/unit/monitor/fetchers/alerts/test_alerts_fetcher.py +++ b/tests/unit/monitor/fetchers/alerts/test_alerts_fetcher.py @@ -44,15 +44,15 @@ def test_update_sent_alerts( @mock.patch("subprocess.run") def test_skip_alerts(mock_subprocess_run, alerts_fetcher_mock: MockAlertsFetcher): # Create 100 alerts - test_alerts = alerts_fetcher_mock.query_pending_test_alerts() - mock_alerts_ids_to_skip = test_alerts * 20 + alerts = alerts_fetcher_mock.query_pending_alerts() + mock_alerts_ids_to_skip = alerts * 20 alerts_fetcher_mock.skip_alerts( alerts_to_skip=mock_alerts_ids_to_skip, ) # Test that alert ids were split into chunks - assert mock_subprocess_run.call_count == 2 + assert mock_subprocess_run.call_count == 6 calls_args = mock_subprocess_run.call_args_list for call_args in calls_args: From bbe04dff0506e8e12d8911b4da038d7c71614693 Mon Sep 17 00:00:00 2001 From: IDoneShaveIt Date: Mon, 22 Jan 2024 16:15:08 +0200 Subject: [PATCH 2/3] Fix test_alerts_fetcher integration tests --- .../monitor/api/alerts/test_alerts_fetcher.py | 28 ++----------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/tests/integration/monitor/api/alerts/test_alerts_fetcher.py b/tests/integration/monitor/api/alerts/test_alerts_fetcher.py index 2d5a1ee04..4b34281b4 100644 --- a/tests/integration/monitor/api/alerts/test_alerts_fetcher.py +++ b/tests/integration/monitor/api/alerts/test_alerts_fetcher.py @@ -1,38 +1,16 @@ import json +from elementary.monitor.api.alerts.schema import SortedAlertsSchema from tests.mocks.api.alerts_api_mock import MockAlertsAPI -def test_get_test_alerts(): - api = MockAlertsAPI() - test_alerts = api.get_test_alerts(days_back=1) - assert_new_alerts(test_alerts) - - -def test_get_model_alerts(): - api = MockAlertsAPI() - model_alerts = api.get_model_alerts(days_back=1) - assert_new_alerts(model_alerts) - - -def test_get_source_freshness_alerts(): - api = MockAlertsAPI() - source_freshness_alerts = api.get_source_freshness_alerts(days_back=1) - assert_new_alerts(source_freshness_alerts) - - def test_get_new_alerts(): api = MockAlertsAPI() alerts = api.get_new_alerts(days_back=1) - test_alerts = alerts.tests - model_alerts = alerts.models - source_freshness_alerts = alerts.source_freshnesses - assert_new_alerts(test_alerts) - assert_new_alerts(model_alerts) - assert_new_alerts(source_freshness_alerts) + assert_new_alerts(alerts) -def assert_new_alerts(new_alerts): +def assert_new_alerts(new_alerts: SortedAlertsSchema): alerts_to_send = new_alerts.send alerts_to_skip = new_alerts.skip From 27f7578b521c535910652af1af771edf17c2de8d Mon Sep 17 00:00:00 2001 From: IDoneShaveIt Date: Mon, 22 Jan 2024 16:22:39 +0200 Subject: [PATCH 3/3] Fix test_alerts_fetcher integration tests --- .../monitor/api/alerts/test_alerts_fetcher.py | 56 ++++++++++++++++--- 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/tests/integration/monitor/api/alerts/test_alerts_fetcher.py b/tests/integration/monitor/api/alerts/test_alerts_fetcher.py index 4b34281b4..7a0e4b04e 100644 --- a/tests/integration/monitor/api/alerts/test_alerts_fetcher.py +++ b/tests/integration/monitor/api/alerts/test_alerts_fetcher.py @@ -1,25 +1,42 @@ import json -from elementary.monitor.api.alerts.schema import SortedAlertsSchema +from elementary.monitor.fetchers.alerts.schema.pending_alerts import AlertTypes from tests.mocks.api.alerts_api_mock import MockAlertsAPI def test_get_new_alerts(): api = MockAlertsAPI() alerts = api.get_new_alerts(days_back=1) - assert_new_alerts(alerts) - - -def assert_new_alerts(new_alerts: SortedAlertsSchema): - alerts_to_send = new_alerts.send - alerts_to_skip = new_alerts.skip + alerts_to_send = alerts.send + alerts_to_skip = alerts.skip # Test the following tests are not suppressed: # - Alert after suppression interval # - Alert without suppression interval # - First occurrence alert with suppression interval assert json.dumps( - [alert.id for alert in alerts_to_send], sort_keys=True + [ + alert.id + for alert in alerts_to_send + if AlertTypes(alert.type) is AlertTypes.TEST + ], + sort_keys=True, + ) == json.dumps(["alert_id_2", "alert_id_3", "alert_id_4"], sort_keys=True) + assert json.dumps( + [ + alert.id + for alert in alerts_to_send + if AlertTypes(alert.type) is AlertTypes.MODEL + ], + sort_keys=True, + ) == json.dumps(["alert_id_2", "alert_id_3", "alert_id_4"], sort_keys=True) + assert json.dumps( + [ + alert.id + for alert in alerts_to_send + if AlertTypes(alert.type) is AlertTypes.SOURCE_FRESHNESS + ], + sort_keys=True, ) == json.dumps(["alert_id_2", "alert_id_3", "alert_id_4"], sort_keys=True) # Test the following tests are suppressed: @@ -27,5 +44,26 @@ def assert_new_alerts(new_alerts: SortedAlertsSchema): # Test the following tests are skipped due to dedup: # - alert_id_5 (Duplication of alert_id_4 with earlier detected time) assert json.dumps( - [alert.id for alert in alerts_to_skip], sort_keys=True + [ + alert.id + for alert in alerts_to_skip + if AlertTypes(alert.type) is AlertTypes.TEST + ], + sort_keys=True, + ) == json.dumps(["alert_id_1", "alert_id_5"], sort_keys=True) + assert json.dumps( + [ + alert.id + for alert in alerts_to_skip + if AlertTypes(alert.type) is AlertTypes.MODEL + ], + sort_keys=True, + ) == json.dumps(["alert_id_1", "alert_id_5"], sort_keys=True) + assert json.dumps( + [ + alert.id + for alert in alerts_to_skip + if AlertTypes(alert.type) is AlertTypes.SOURCE_FRESHNESS + ], + sort_keys=True, ) == json.dumps(["alert_id_1", "alert_id_5"], sort_keys=True)