diff --git a/src/sentry/workflow_engine/handlers/condition/__init__.py b/src/sentry/workflow_engine/handlers/condition/__init__.py index 68066f7706952f..b45e6a7a66494b 100644 --- a/src/sentry/workflow_engine/handlers/condition/__init__.py +++ b/src/sentry/workflow_engine/handlers/condition/__init__.py @@ -1,5 +1,7 @@ __all__ = [ "EventCreatedByDetectorConditionHandler", + "EventFrequencyCountHandler", + "EventFrequencyPercentHandler", "EventSeenCountConditionHandler", "EveryEventConditionHandler", "ReappearedEventConditionHandler", @@ -23,6 +25,7 @@ from .assigned_to_handler import AssignedToConditionHandler from .event_attribute_handler import EventAttributeConditionHandler from .event_created_by_detector_handler import EventCreatedByDetectorConditionHandler +from .event_frequency_handlers import EventFrequencyCountHandler, EventFrequencyPercentHandler from .event_seen_count_handler import EventSeenCountConditionHandler from .every_event_handler import EveryEventConditionHandler from .existing_high_priority_issue_handler import ExistingHighPriorityIssueConditionHandler diff --git a/src/sentry/workflow_engine/handlers/condition/event_frequency_handlers.py b/src/sentry/workflow_engine/handlers/condition/event_frequency_handlers.py index dd8c93dbd9c34a..30a067b8a7a436 100644 --- a/src/sentry/workflow_engine/handlers/condition/event_frequency_handlers.py +++ b/src/sentry/workflow_engine/handlers/condition/event_frequency_handlers.py @@ -16,7 +16,7 @@ ) from sentry.workflow_engine.models.data_condition import Condition from sentry.workflow_engine.registry import condition_handler_registry -from sentry.workflow_engine.types import DataConditionHandler, DataConditionResult +from sentry.workflow_engine.types import DataConditionHandler, DataConditionResult, WorkflowJob class EventFrequencyConditionHandler(BaseEventFrequencyConditionHandler): @@ -59,7 +59,7 @@ def get_result(model: TSDBModel, group_ids: list[int]) -> dict[int, int]: @condition_handler_registry.register(Condition.EVENT_FREQUENCY_COUNT) -class EventFrequencyCountHandler(EventFrequencyConditionHandler, DataConditionHandler[int]): +class EventFrequencyCountHandler(EventFrequencyConditionHandler, DataConditionHandler[WorkflowJob]): comparison_json_schema = { "type": "object", "properties": { @@ -71,12 +71,16 @@ class EventFrequencyCountHandler(EventFrequencyConditionHandler, DataConditionHa } @staticmethod - def evaluate_value(value: int, comparison: Any) -> DataConditionResult: - return value > comparison["value"] + def evaluate_value(value: WorkflowJob, comparison: Any) -> DataConditionResult: + if len(value.get("snuba_results", [])) != 1: + return False + return value["snuba_results"][0] > comparison["value"] @condition_handler_registry.register(Condition.EVENT_FREQUENCY_PERCENT) -class EventFrequencyPercentHandler(EventFrequencyConditionHandler, DataConditionHandler[list[int]]): +class EventFrequencyPercentHandler( + EventFrequencyConditionHandler, DataConditionHandler[WorkflowJob] +): comparison_json_schema = { "type": "object", "properties": { @@ -89,7 +93,10 @@ class EventFrequencyPercentHandler(EventFrequencyConditionHandler, DataCondition } @staticmethod - def evaluate_value(value: list[int], comparison: Any) -> DataConditionResult: - if len(value) != 2: + def evaluate_value(value: WorkflowJob, comparison: Any) -> DataConditionResult: + if len(value.get("snuba_results", [])) != 2: return False - return percent_increase(value[0], value[1]) > comparison["value"] + return ( + percent_increase(value["snuba_results"][0], value["snuba_results"][1]) + > comparison["value"] + ) diff --git a/src/sentry/workflow_engine/models/data_condition.py b/src/sentry/workflow_engine/models/data_condition.py index 1a364f21b8d507..3788527647c7a2 100644 --- a/src/sentry/workflow_engine/models/data_condition.py +++ b/src/sentry/workflow_engine/models/data_condition.py @@ -66,6 +66,18 @@ class Condition(models.TextChoices): Condition.NOT_EQUAL: operator.ne, } +SLOW_CONDITIONS = [ + Condition.EVENT_FREQUENCY_COUNT, + Condition.EVENT_FREQUENCY_PERCENT, + Condition.EVENT_UNIQUE_USER_FREQUENCY_COUNT, + Condition.EVENT_UNIQUE_USER_FREQUENCY_PERCENT, + Condition.PERCENT_SESSIONS_COUNT, + Condition.PERCENT_SESSIONS_PERCENT, + Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_COUNT, + Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_PERCENT, +] + + T = TypeVar("T") @@ -140,18 +152,6 @@ def evaluate_value(self, value: T) -> DataConditionResult: return self.get_condition_result() if result else None -SLOW_CONDITIONS = [ - Condition.EVENT_FREQUENCY_COUNT, - Condition.EVENT_FREQUENCY_PERCENT, - Condition.EVENT_UNIQUE_USER_FREQUENCY_COUNT, - Condition.EVENT_UNIQUE_USER_FREQUENCY_PERCENT, - Condition.PERCENT_SESSIONS_COUNT, - Condition.PERCENT_SESSIONS_PERCENT, - Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_COUNT, - Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_PERCENT, -] - - def is_slow_condition(cond: DataCondition) -> bool: return Condition(cond.type) in SLOW_CONDITIONS diff --git a/src/sentry/workflow_engine/models/workflow.py b/src/sentry/workflow_engine/models/workflow.py index b37dece9419179..9623d73d13df6a 100644 --- a/src/sentry/workflow_engine/models/workflow.py +++ b/src/sentry/workflow_engine/models/workflow.py @@ -9,6 +9,7 @@ from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model, sane_repr from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey from sentry.models.owner_base import OwnerModel +from sentry.workflow_engine.models.data_condition import DataCondition, is_slow_condition from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group from sentry.workflow_engine.types import WorkflowJob @@ -77,6 +78,18 @@ def evaluate_trigger_conditions(self, job: WorkflowJob) -> bool: return evaluation +def get_slow_conditions(workflow: Workflow) -> list[DataCondition]: + if not workflow.when_condition_group: + return [] + + slow_conditions = [ + condition + for condition in workflow.when_condition_group.conditions.all() + if is_slow_condition(condition) + ] + return slow_conditions + + @receiver(pre_save, sender=Workflow) def enforce_config_schema(sender, instance: Workflow, **kwargs): instance.validate_config(instance.config_schema) diff --git a/src/sentry/workflow_engine/processors/data_condition_group.py b/src/sentry/workflow_engine/processors/data_condition_group.py index 637e91c5a6b34d..788836eae28b33 100644 --- a/src/sentry/workflow_engine/processors/data_condition_group.py +++ b/src/sentry/workflow_engine/processors/data_condition_group.py @@ -28,11 +28,6 @@ def evaluate_condition_group( results = [] conditions = get_data_conditions_for_group(data_condition_group.id) - # TODO - @saponifi3d - # Split the conditions into fast and slow conditions - # Evaluate the fast conditions first, if any are met, return early - # Enqueue the slow conditions to be evaluated later - if len(conditions) == 0: # if we don't have any conditions, always return True return True, [] @@ -54,12 +49,14 @@ def evaluate_condition_group( if data_condition_group.logic_type == data_condition_group.Type.NONE: # if we get to this point, no conditions were met return True, [] + elif data_condition_group.logic_type == data_condition_group.Type.ANY: is_any_condition_met = any([result[0] for result in results]) if is_any_condition_met: condition_results = [result[1] for result in results if result[0]] return is_any_condition_met, condition_results + elif data_condition_group.logic_type == data_condition_group.Type.ALL: conditions_met = [result[0] for result in results] is_all_conditions_met = all(conditions_met) diff --git a/src/sentry/workflow_engine/processors/workflow.py b/src/sentry/workflow_engine/processors/workflow.py index be2fe0f88e4e92..1baab325c4ae27 100644 --- a/src/sentry/workflow_engine/processors/workflow.py +++ b/src/sentry/workflow_engine/processors/workflow.py @@ -1,22 +1,83 @@ import logging +from collections import defaultdict import sentry_sdk -from sentry.utils import metrics -from sentry.workflow_engine.models import Detector, Workflow +from sentry import buffer +from sentry.utils import json, metrics +from sentry.workflow_engine.models import Detector, Workflow, WorkflowDataConditionGroup +from sentry.workflow_engine.models.workflow import get_slow_conditions from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters +from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group from sentry.workflow_engine.processors.detector import get_detector_by_event from sentry.workflow_engine.types import WorkflowJob logger = logging.getLogger(__name__) +WORKFLOW_ENGINE_BUFFER_LIST_KEY = "workflow_engine_delayed_processing_buffer" + + +def get_data_condition_groups_to_fire( + workflows: set[Workflow], job: WorkflowJob +) -> dict[int, list[int]]: + workflow_action_groups: dict[int, list[int]] = defaultdict(list) + + workflow_ids = {workflow.id for workflow in workflows} + + workflow_dcgs = WorkflowDataConditionGroup.objects.filter( + workflow_id__in=workflow_ids + ).select_related("condition_group", "workflow") + + for workflow_dcg in workflow_dcgs: + action_condition = workflow_dcg.condition_group + evaluation, result = evaluate_condition_group(action_condition, job) + + if evaluation: + workflow_action_groups[workflow_dcg.workflow_id].append(action_condition.id) + + return workflow_action_groups + + +def enqueue_workflows( + workflows: set[Workflow], + job: WorkflowJob, +) -> None: + event = job["event"] + project_id = event.group.project.id + workflow_action_groups = get_data_condition_groups_to_fire(workflows, job) + + for workflow in workflows: + buffer.backend.push_to_sorted_set(key=WORKFLOW_ENGINE_BUFFER_LIST_KEY, value=project_id) + + action_filters = workflow_action_groups.get(workflow.id, []) + if not action_filters: + continue + + action_filter_fields = ":".join(map(str, action_filters)) + + value = json.dumps({"event_id": event.event_id, "occurrence_id": event.occurrence_id}) + buffer.backend.push_to_hash( + model=Workflow, + filters={"project": project_id}, + field=f"{workflow.id}:{event.group.id}:{action_filter_fields}", + value=value, + ) + def evaluate_workflow_triggers(workflows: set[Workflow], job: WorkflowJob) -> set[Workflow]: triggered_workflows: set[Workflow] = set() + workflows_to_enqueue: set[Workflow] = set() for workflow in workflows: if workflow.evaluate_trigger_conditions(job): triggered_workflows.add(workflow) + else: + if get_slow_conditions(workflow): + # enqueue to be evaluated later + workflows_to_enqueue.add(workflow) + + if workflows_to_enqueue: + enqueue_workflows(workflows_to_enqueue, job) return triggered_workflows diff --git a/src/sentry/workflow_engine/types.py b/src/sentry/workflow_engine/types.py index a5833a47916bc7..e53f898598392c 100644 --- a/src/sentry/workflow_engine/types.py +++ b/src/sentry/workflow_engine/types.py @@ -40,6 +40,7 @@ class WorkflowJob(EventJob, total=False): has_alert: bool has_escalated: bool workflow: Workflow + snuba_results: list[int] # TODO - @saponifi3 / TODO(cathy): audit this class ActionHandler: diff --git a/tests/sentry/workflow_engine/handlers/condition/test_base.py b/tests/sentry/workflow_engine/handlers/condition/test_base.py index e30df91a8ca415..30bcd1b610ef2b 100644 --- a/tests/sentry/workflow_engine/handlers/condition/test_base.py +++ b/tests/sentry/workflow_engine/handlers/condition/test_base.py @@ -46,13 +46,6 @@ def assert_passes(self, data_condition: DataCondition, job: WorkflowJob) -> None def assert_does_not_pass(self, data_condition: DataCondition, job: WorkflowJob) -> None: assert data_condition.evaluate_value(job) != data_condition.get_condition_result() - # Slow conditions are evaluated in delayed processing and take in the results directly - def assert_slow_cond_passes(self, data_condition: DataCondition, value: Any) -> None: - assert data_condition.evaluate_value(value) == data_condition.get_condition_result() - - def assert_slow_cond_does_not_pass(self, data_condition: DataCondition, value: Any) -> None: - assert data_condition.evaluate_value(value) != data_condition.get_condition_result() - # TODO: activity diff --git a/tests/sentry/workflow_engine/handlers/condition/test_event_frequency_handlers.py b/tests/sentry/workflow_engine/handlers/condition/test_event_frequency_handlers.py index 483b7f71a32ee4..5fd4fc6c495462 100644 --- a/tests/sentry/workflow_engine/handlers/condition/test_event_frequency_handlers.py +++ b/tests/sentry/workflow_engine/handlers/condition/test_event_frequency_handlers.py @@ -9,6 +9,7 @@ EventFrequencyCountHandler, ) from sentry.workflow_engine.models.data_condition import Condition +from sentry.workflow_engine.types import WorkflowJob from tests.sentry.workflow_engine.handlers.condition.test_base import ( ConditionTestCase, EventFrequencyQueryTestBase, @@ -27,6 +28,10 @@ class TestEventFrequencyCountCondition(ConditionTestCase): "comparisonType": ComparisonType.COUNT, } + def setUp(self): + super().setUp() + self.job = WorkflowJob({"event": self.group_event}) + def test_count(self): dc = self.create_data_condition( type=self.condition, @@ -34,8 +39,11 @@ def test_count(self): condition_result=True, ) - self.assert_slow_cond_passes(dc, 1001) - self.assert_slow_cond_does_not_pass(dc, 999) + self.job["snuba_results"] = [1001] + self.assert_passes(dc, self.job) + + self.job["snuba_results"] = [999] + self.assert_does_not_pass(dc, self.job) def test_dual_write_count(self): dcg = self.create_data_condition_group() @@ -81,6 +89,10 @@ class TestEventFrequencyPercentCondition(ConditionTestCase): "comparisonType": ComparisonType.PERCENT, } + def setUp(self): + super().setUp() + self.job = WorkflowJob({"event": self.group_event}) + def test_percent(self): dc = self.create_data_condition( type=self.condition, @@ -92,8 +104,11 @@ def test_percent(self): condition_result=True, ) - self.assert_slow_cond_passes(dc, [21, 10]) - self.assert_slow_cond_does_not_pass(dc, [20, 10]) + self.job["snuba_results"] = [21, 10] + self.assert_passes(dc, self.job) + + self.job["snuba_results"] = [20, 10] + self.assert_does_not_pass(dc, self.job) def test_dual_write_percent(self): self.payload.update({"comparisonType": ComparisonType.PERCENT, "comparisonInterval": "1d"}) diff --git a/tests/sentry/workflow_engine/processors/test_workflow.py b/tests/sentry/workflow_engine/processors/test_workflow.py index 8a52a70630f5ce..480dcd0da84de1 100644 --- a/tests/sentry/workflow_engine/processors/test_workflow.py +++ b/tests/sentry/workflow_engine/processors/test_workflow.py @@ -1,13 +1,23 @@ +from datetime import timedelta from unittest import mock +from sentry import buffer from sentry.eventstream.base import GroupState from sentry.grouping.grouptype import ErrorGroupType +from sentry.testutils.helpers.datetime import before_now, freeze_time +from sentry.testutils.helpers.redis import mock_redis_buffer from sentry.workflow_engine.models import DataConditionGroup from sentry.workflow_engine.models.data_condition import Condition -from sentry.workflow_engine.processors.workflow import evaluate_workflow_triggers, process_workflows +from sentry.workflow_engine.processors.workflow import ( + WORKFLOW_ENGINE_BUFFER_LIST_KEY, + evaluate_workflow_triggers, + process_workflows, +) from sentry.workflow_engine.types import WorkflowJob from tests.sentry.workflow_engine.test_base import BaseWorkflowTest +FROZEN_TIME = before_now(days=1).replace(hour=1, minute=30, second=0, microsecond=0) + class TestProcessWorkflows(BaseWorkflowTest): def setUp(self): @@ -105,8 +115,8 @@ def test_no_workflow_trigger(self): assert not triggered_workflows def test_workflow_many_filters(self): - if self.workflow.when_condition_group is not None: - self.workflow.when_condition_group.logic_type = DataConditionGroup.Type.ALL + assert self.workflow.when_condition_group + self.workflow.when_condition_group.update(logic_type=DataConditionGroup.Type.ALL) self.create_data_condition( condition_group=self.workflow.when_condition_group, @@ -118,9 +128,9 @@ def test_workflow_many_filters(self): triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.job) assert triggered_workflows == {self.workflow} - def test_workflow_filterd_out(self): - if self.workflow.when_condition_group is not None: - self.workflow.when_condition_group.logic_type = DataConditionGroup.Type.ALL + def test_workflow_filtered_out(self): + assert self.workflow.when_condition_group + self.workflow.when_condition_group.update(logic_type=DataConditionGroup.Type.ALL) self.create_data_condition( condition_group=self.workflow.when_condition_group, @@ -136,3 +146,97 @@ def test_many_workflows(self): triggered_workflows = evaluate_workflow_triggers({self.workflow, workflow_two}, self.job) assert triggered_workflows == {self.workflow, workflow_two} + + def test_skips_slow_conditions(self): + # triggers workflow if the logic_type is ANY and a condition is met + self.create_data_condition( + condition_group=self.workflow.when_condition_group, + type=Condition.EVENT_FREQUENCY_COUNT, + comparison={ + "interval": "1h", + "value": 100, + }, + condition_result=True, + ) + + triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.job) + assert triggered_workflows == {self.workflow} + + +@freeze_time(FROZEN_TIME) +class TestEnqueueWorkflow(BaseWorkflowTest): + buffer_timestamp = (FROZEN_TIME + timedelta(seconds=1)).timestamp() + + def setUp(self): + ( + self.workflow, + self.detector, + self.detector_workflow, + self.workflow_triggers, + ) = self.create_detector_and_workflow() + + occurrence = self.build_occurrence(evidence_data={"detector_id": self.detector.id}) + self.group, self.event, self.group_event = self.create_group_event( + occurrence=occurrence, + ) + self.job = WorkflowJob({"event": self.group_event}) + self.create_workflow_action(self.workflow) + self.mock_redis_buffer = mock_redis_buffer() + self.mock_redis_buffer.__enter__() + + def tearDown(self): + self.mock_redis_buffer.__exit__(None, None, None) + + def test_enqueues_workflow_all_logic_type(self): + assert self.workflow.when_condition_group + self.workflow.when_condition_group.update(logic_type=DataConditionGroup.Type.ALL) + self.create_data_condition( + condition_group=self.workflow.when_condition_group, + type=Condition.EVENT_FREQUENCY_COUNT, + comparison={ + "interval": "1h", + "value": 100, + }, + condition_result=True, + ) + + triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.job) + assert not triggered_workflows + + process_workflows(self.job) + + project_ids = buffer.backend.get_sorted_set( + WORKFLOW_ENGINE_BUFFER_LIST_KEY, 0, self.buffer_timestamp + ) + assert project_ids + assert project_ids[0][0] == self.project.id + + def test_enqueues_workflow_any_logic_type(self): + assert self.workflow.when_condition_group + self.workflow.when_condition_group.conditions.all().delete() + + self.create_data_condition( + condition_group=self.workflow.when_condition_group, + type=Condition.EVENT_FREQUENCY_COUNT, + comparison={ + "interval": "1h", + "value": 100, + }, + condition_result=True, + ) + self.create_data_condition( + condition_group=self.workflow.when_condition_group, + type=Condition.REGRESSION_EVENT, # fast condition, does not pass + comparison=True, + condition_result=True, + ) + + triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.job) + assert not triggered_workflows + + process_workflows(self.job) + + project_ids = buffer.backend.get_sorted_set( + WORKFLOW_ENGINE_BUFFER_LIST_KEY, 0, self.buffer_timestamp + ) + assert project_ids[0][0] == self.project.id