Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ELE-2390 - better logic separation for DataMonitoringAlerts flow #1384

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 41 additions & 104 deletions elementary/monitor/api/alerts/alert_filters.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
from functools import reduce
from typing import List, Union
from typing import List

from elementary.monitor.data_monitoring.schema import (
FilterSchema,
Expand All @@ -9,27 +9,18 @@
StatusFilterSchema,
)
from elementary.monitor.fetchers.alerts.schema.pending_alerts import (
PendingModelAlertSchema,
PendingSourceFreshnessAlertSchema,
PendingTestAlertSchema,
AlertTypes,
PendingAlertSchema,
)
from elementary.utils.log import get_logger

logger = get_logger(__name__)


def filter_alerts(
alerts: Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
],
alerts: List[PendingAlertSchema],
alerts_filter: FiltersSchema = FiltersSchema(),
) -> Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
]:
) -> List[PendingAlertSchema]:
# If the filter is on invocation stuff, it's not relevant to alerts and we return an empty list
if (
alerts_filter.invocation_id is not None
Expand Down Expand Up @@ -59,21 +50,9 @@ def filter_alerts(


def _find_common_alerts(
first_alerts: Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
],
second_alerts: Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
],
) -> Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
]:
first_alerts: List[PendingAlertSchema],
second_alerts: List[PendingAlertSchema],
) -> List[PendingAlertSchema]:
first_hashable_alerts = [alert.json(sort_keys=True) for alert in first_alerts]
second_hashable_alerts = [alert.json(sort_keys=True) for alert in second_alerts]
common_hashable_alerts = [
Expand All @@ -94,17 +73,9 @@ def _find_common_alerts(


def _filter_alerts_by_tags(
alerts: Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
],
alerts: List[PendingAlertSchema],
tags_filters: List[FilterSchema],
) -> Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
]:
) -> List[PendingAlertSchema]:
if not tags_filters:
return [*alerts]

Expand All @@ -114,26 +85,18 @@ def _filter_alerts_by_tags(
for tags_filter in tags_filters:
filtered_alerts_by_tags = []
for alert in alerts:
if any(tag in (alert.tags or []) for tag in tags_filter.values):
if any(tag in (alert.data.tags or []) for tag in tags_filter.values):
filtered_alerts_by_tags.append(alert)
grouped_filtered_alerts_by_tags.append(filtered_alerts_by_tags)

# AND filter between all tags_filters
return reduce(_find_common_alerts, grouped_filtered_alerts_by_tags) # type: ignore[return-value, arg-type]
return reduce(_find_common_alerts, grouped_filtered_alerts_by_tags)


def _filter_alerts_by_owners(
alerts: Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
],
alerts: List[PendingAlertSchema],
owners_filters: List[FilterSchema],
) -> Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
]:
) -> List[PendingAlertSchema]:
if not owners_filters:
return [*alerts]

Expand All @@ -143,26 +106,20 @@ def _filter_alerts_by_owners(
for owners_filter in owners_filters:
filtered_alerts_by_owners = []
for alert in alerts:
if any(owner in alert.unified_owners for owner in owners_filter.values):
if any(
owner in alert.data.unified_owners for owner in owners_filter.values
):
filtered_alerts_by_owners.append(alert)
grouped_filtered_alerts_by_owners.append(filtered_alerts_by_owners)

# AND filter between all owners_filters
return reduce(_find_common_alerts, grouped_filtered_alerts_by_owners) # type: ignore[return-value, arg-type]
return reduce(_find_common_alerts, grouped_filtered_alerts_by_owners)


def _filter_alerts_by_models(
alerts: Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
],
alerts: List[PendingAlertSchema],
models_filters: List[FilterSchema],
) -> Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
]:
) -> List[PendingAlertSchema]:
if not models_filters:
return [*alerts]

Expand All @@ -173,40 +130,36 @@ def _filter_alerts_by_models(
filtered_alerts_by_models = []
for alert in alerts:
if any(
(alert.model_unique_id and alert.model_unique_id.endswith(model))
(
alert.data.model_unique_id
and alert.data.model_unique_id.endswith(model)
)
for model in models_filter.values
):
filtered_alerts_by_models.append(alert)
grouped_filtered_alerts_by_models.append(filtered_alerts_by_models)

# AND filter between all models_filters
return reduce(_find_common_alerts, grouped_filtered_alerts_by_models) # type: ignore[return-value, arg-type]
return reduce(_find_common_alerts, grouped_filtered_alerts_by_models)


def _filter_alerts_by_node_names(
alerts: Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
],
alerts: List[PendingAlertSchema],
node_names_filters: List[str],
) -> Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
]:
) -> List[PendingAlertSchema]:
if not node_names_filters:
return [*alerts]

filtered_alerts = []
for alert in alerts:
alert_node_name = None
if isinstance(alert, PendingTestAlertSchema):
alert_node_name = alert.test_name
elif isinstance(alert, PendingModelAlertSchema) or isinstance(
alert, PendingSourceFreshnessAlertSchema
alert_type = AlertTypes(alert.type)
if alert_type is AlertTypes.TEST:
alert_node_name = alert.data.test_name # type: ignore[union-attr]
elif (
alert_type is AlertTypes.MODEL or alert_type is AlertTypes.SOURCE_FRESHNESS
):
alert_node_name = alert.model_unique_id
alert_node_name = alert.data.model_unique_id
else:
# Shouldn't happen
raise Exception(f"Unexpected alert type: {type(alert)}")
Expand All @@ -222,17 +175,9 @@ def _filter_alerts_by_node_names(


def _filter_alerts_by_statuses(
alerts: Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
],
alerts: List[PendingAlertSchema],
statuses_filters: List[StatusFilterSchema],
) -> Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
]:
) -> List[PendingAlertSchema]:
if not statuses_filters:
return [*alerts]

Expand All @@ -242,26 +187,18 @@ def _filter_alerts_by_statuses(
for statuses_filter in statuses_filters:
filtered_alerts_by_statuses = []
for alert in alerts:
if any(status == alert.status for status in statuses_filter.values):
if any(status == alert.data.status for status in statuses_filter.values):
filtered_alerts_by_statuses.append(alert)
grouped_filtered_alerts_by_statuses.append(filtered_alerts_by_statuses)

# AND filter between all statuses_filters
return reduce(_find_common_alerts, grouped_filtered_alerts_by_statuses) # type: ignore[return-value, arg-type]
return reduce(_find_common_alerts, grouped_filtered_alerts_by_statuses)


def _filter_alerts_by_resource_types(
alerts: Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
],
alerts: List[PendingAlertSchema],
resource_types_filters: List[ResourceTypeFilterSchema],
) -> Union[
List[PendingTestAlertSchema],
List[PendingModelAlertSchema],
List[PendingSourceFreshnessAlertSchema],
]:
) -> List[PendingAlertSchema]:
if not resource_types_filters:
return [*alerts]

Expand All @@ -272,7 +209,7 @@ def _filter_alerts_by_resource_types(
filtered_alerts_by_resource_types = []
for alert in alerts:
if any(
resource_type == alert.resource_type.value
resource_type == alert.data.resource_type.value
for resource_type in resource_types_filter.values
):
filtered_alerts_by_resource_types.append(alert)
Expand All @@ -281,4 +218,4 @@ def _filter_alerts_by_resource_types(
)

# AND filter between all resource_types_filters
return reduce(_find_common_alerts, grouped_filtered_alerts_by_resource_types) # type: ignore[return-value, arg-type]
return reduce(_find_common_alerts, grouped_filtered_alerts_by_resource_types)
Loading