Skip to content

Commit

Permalink
feat(aci): enqueue workflows for delayed processing (#83548)
Browse files Browse the repository at this point in the history
  • Loading branch information
cathteng authored Jan 21, 2025
1 parent 6570a65 commit 3caa22e
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 44 deletions.
3 changes: 3 additions & 0 deletions src/sentry/workflow_engine/handlers/condition/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
__all__ = [
"EventCreatedByDetectorConditionHandler",
"EventFrequencyCountHandler",
"EventFrequencyPercentHandler",
"EventSeenCountConditionHandler",
"EveryEventConditionHandler",
"ReappearedEventConditionHandler",
Expand All @@ -23,6 +25,7 @@
from .assigned_to_handler import AssignedToConditionHandler
from .event_attribute_handler import EventAttributeConditionHandler
from .event_created_by_detector_handler import EventCreatedByDetectorConditionHandler
from .event_frequency_handlers import EventFrequencyCountHandler, EventFrequencyPercentHandler
from .event_seen_count_handler import EventSeenCountConditionHandler
from .every_event_handler import EveryEventConditionHandler
from .existing_high_priority_issue_handler import ExistingHighPriorityIssueConditionHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.registry import condition_handler_registry
from sentry.workflow_engine.types import DataConditionHandler, DataConditionResult
from sentry.workflow_engine.types import DataConditionHandler, DataConditionResult, WorkflowJob


class EventFrequencyConditionHandler(BaseEventFrequencyConditionHandler):
Expand Down Expand Up @@ -59,7 +59,7 @@ def get_result(model: TSDBModel, group_ids: list[int]) -> dict[int, int]:


@condition_handler_registry.register(Condition.EVENT_FREQUENCY_COUNT)
class EventFrequencyCountHandler(EventFrequencyConditionHandler, DataConditionHandler[int]):
class EventFrequencyCountHandler(EventFrequencyConditionHandler, DataConditionHandler[WorkflowJob]):
comparison_json_schema = {
"type": "object",
"properties": {
Expand All @@ -71,12 +71,16 @@ class EventFrequencyCountHandler(EventFrequencyConditionHandler, DataConditionHa
}

@staticmethod
def evaluate_value(value: int, comparison: Any) -> DataConditionResult:
return value > comparison["value"]
def evaluate_value(value: WorkflowJob, comparison: Any) -> DataConditionResult:
if len(value.get("snuba_results", [])) != 1:
return False
return value["snuba_results"][0] > comparison["value"]


@condition_handler_registry.register(Condition.EVENT_FREQUENCY_PERCENT)
class EventFrequencyPercentHandler(EventFrequencyConditionHandler, DataConditionHandler[list[int]]):
class EventFrequencyPercentHandler(
EventFrequencyConditionHandler, DataConditionHandler[WorkflowJob]
):
comparison_json_schema = {
"type": "object",
"properties": {
Expand All @@ -89,7 +93,10 @@ class EventFrequencyPercentHandler(EventFrequencyConditionHandler, DataCondition
}

@staticmethod
def evaluate_value(value: list[int], comparison: Any) -> DataConditionResult:
if len(value) != 2:
def evaluate_value(value: WorkflowJob, comparison: Any) -> DataConditionResult:
if len(value.get("snuba_results", [])) != 2:
return False
return percent_increase(value[0], value[1]) > comparison["value"]
return (
percent_increase(value["snuba_results"][0], value["snuba_results"][1])
> comparison["value"]
)
24 changes: 12 additions & 12 deletions src/sentry/workflow_engine/models/data_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ class Condition(models.TextChoices):
Condition.NOT_EQUAL: operator.ne,
}

SLOW_CONDITIONS = [
Condition.EVENT_FREQUENCY_COUNT,
Condition.EVENT_FREQUENCY_PERCENT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_COUNT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_PERCENT,
Condition.PERCENT_SESSIONS_COUNT,
Condition.PERCENT_SESSIONS_PERCENT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_COUNT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_PERCENT,
]


T = TypeVar("T")


Expand Down Expand Up @@ -140,18 +152,6 @@ def evaluate_value(self, value: T) -> DataConditionResult:
return self.get_condition_result() if result else None


SLOW_CONDITIONS = [
Condition.EVENT_FREQUENCY_COUNT,
Condition.EVENT_FREQUENCY_PERCENT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_COUNT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_PERCENT,
Condition.PERCENT_SESSIONS_COUNT,
Condition.PERCENT_SESSIONS_PERCENT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_COUNT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_PERCENT,
]


def is_slow_condition(cond: DataCondition) -> bool:
return Condition(cond.type) in SLOW_CONDITIONS

Expand Down
13 changes: 13 additions & 0 deletions src/sentry/workflow_engine/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model, sane_repr
from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
from sentry.models.owner_base import OwnerModel
from sentry.workflow_engine.models.data_condition import DataCondition, is_slow_condition
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group
from sentry.workflow_engine.types import WorkflowJob

Expand Down Expand Up @@ -79,6 +80,18 @@ def evaluate_trigger_conditions(self, job: WorkflowJob) -> bool:
return evaluation


def get_slow_conditions(workflow: Workflow) -> list[DataCondition]:
if not workflow.when_condition_group:
return []

slow_conditions = [
condition
for condition in workflow.when_condition_group.conditions.all()
if is_slow_condition(condition)
]
return slow_conditions


@receiver(pre_save, sender=Workflow)
def enforce_config_schema(sender, instance: Workflow, **kwargs):
instance.validate_config(instance.config_schema)
7 changes: 2 additions & 5 deletions src/sentry/workflow_engine/processors/data_condition_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ def evaluate_condition_group(
results = []
conditions = get_data_conditions_for_group(data_condition_group.id)

# TODO - @saponifi3d
# Split the conditions into fast and slow conditions
# Evaluate the fast conditions first, if any are met, return early
# Enqueue the slow conditions to be evaluated later

if len(conditions) == 0:
# if we don't have any conditions, always return True
return True, []
Expand All @@ -54,12 +49,14 @@ def evaluate_condition_group(
if data_condition_group.logic_type == data_condition_group.Type.NONE:
# if we get to this point, no conditions were met
return True, []

elif data_condition_group.logic_type == data_condition_group.Type.ANY:
is_any_condition_met = any([result[0] for result in results])

if is_any_condition_met:
condition_results = [result[1] for result in results if result[0]]
return is_any_condition_met, condition_results

elif data_condition_group.logic_type == data_condition_group.Type.ALL:
conditions_met = [result[0] for result in results]
is_all_conditions_met = all(conditions_met)
Expand Down
65 changes: 63 additions & 2 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,83 @@
import logging
from collections import defaultdict

import sentry_sdk

from sentry.utils import metrics
from sentry.workflow_engine.models import Detector, Workflow
from sentry import buffer
from sentry.utils import json, metrics
from sentry.workflow_engine.models import Detector, Workflow, WorkflowDataConditionGroup
from sentry.workflow_engine.models.workflow import get_slow_conditions
from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group
from sentry.workflow_engine.processors.detector import get_detector_by_event
from sentry.workflow_engine.types import WorkflowJob

logger = logging.getLogger(__name__)

WORKFLOW_ENGINE_BUFFER_LIST_KEY = "workflow_engine_delayed_processing_buffer"


def get_data_condition_groups_to_fire(
workflows: set[Workflow], job: WorkflowJob
) -> dict[int, list[int]]:
workflow_action_groups: dict[int, list[int]] = defaultdict(list)

workflow_ids = {workflow.id for workflow in workflows}

workflow_dcgs = WorkflowDataConditionGroup.objects.filter(
workflow_id__in=workflow_ids
).select_related("condition_group", "workflow")

for workflow_dcg in workflow_dcgs:
action_condition = workflow_dcg.condition_group
evaluation, result = evaluate_condition_group(action_condition, job)

if evaluation:
workflow_action_groups[workflow_dcg.workflow_id].append(action_condition.id)

return workflow_action_groups


def enqueue_workflows(
workflows: set[Workflow],
job: WorkflowJob,
) -> None:
event = job["event"]
project_id = event.group.project.id
workflow_action_groups = get_data_condition_groups_to_fire(workflows, job)

for workflow in workflows:
buffer.backend.push_to_sorted_set(key=WORKFLOW_ENGINE_BUFFER_LIST_KEY, value=project_id)

action_filters = workflow_action_groups.get(workflow.id, [])
if not action_filters:
continue

action_filter_fields = ":".join(map(str, action_filters))

value = json.dumps({"event_id": event.event_id, "occurrence_id": event.occurrence_id})
buffer.backend.push_to_hash(
model=Workflow,
filters={"project": project_id},
field=f"{workflow.id}:{event.group.id}:{action_filter_fields}",
value=value,
)


def evaluate_workflow_triggers(workflows: set[Workflow], job: WorkflowJob) -> set[Workflow]:
triggered_workflows: set[Workflow] = set()
workflows_to_enqueue: set[Workflow] = set()

for workflow in workflows:
if workflow.evaluate_trigger_conditions(job):
triggered_workflows.add(workflow)
else:
if get_slow_conditions(workflow):
# enqueue to be evaluated later
workflows_to_enqueue.add(workflow)

if workflows_to_enqueue:
enqueue_workflows(workflows_to_enqueue, job)

return triggered_workflows

Expand Down
1 change: 1 addition & 0 deletions src/sentry/workflow_engine/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class WorkflowJob(EventJob, total=False):
has_alert: bool
has_escalated: bool
workflow: Workflow
snuba_results: list[int] # TODO - @saponifi3 / TODO(cathy): audit this


class ActionHandler:
Expand Down
7 changes: 0 additions & 7 deletions tests/sentry/workflow_engine/handlers/condition/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,6 @@ def assert_passes(self, data_condition: DataCondition, job: WorkflowJob) -> None
def assert_does_not_pass(self, data_condition: DataCondition, job: WorkflowJob) -> None:
assert data_condition.evaluate_value(job) != data_condition.get_condition_result()

# Slow conditions are evaluated in delayed processing and take in the results directly
def assert_slow_cond_passes(self, data_condition: DataCondition, value: Any) -> None:
assert data_condition.evaluate_value(value) == data_condition.get_condition_result()

def assert_slow_cond_does_not_pass(self, data_condition: DataCondition, value: Any) -> None:
assert data_condition.evaluate_value(value) != data_condition.get_condition_result()

# TODO: activity


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
EventFrequencyCountHandler,
)
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.types import WorkflowJob
from tests.sentry.workflow_engine.handlers.condition.test_base import (
ConditionTestCase,
EventFrequencyQueryTestBase,
Expand All @@ -27,15 +28,22 @@ class TestEventFrequencyCountCondition(ConditionTestCase):
"comparisonType": ComparisonType.COUNT,
}

def setUp(self):
super().setUp()
self.job = WorkflowJob({"event": self.group_event})

def test_count(self):
dc = self.create_data_condition(
type=self.condition,
comparison={"interval": "1h", "value": 1000},
condition_result=True,
)

self.assert_slow_cond_passes(dc, 1001)
self.assert_slow_cond_does_not_pass(dc, 999)
self.job["snuba_results"] = [1001]
self.assert_passes(dc, self.job)

self.job["snuba_results"] = [999]
self.assert_does_not_pass(dc, self.job)

def test_dual_write_count(self):
dcg = self.create_data_condition_group()
Expand Down Expand Up @@ -81,6 +89,10 @@ class TestEventFrequencyPercentCondition(ConditionTestCase):
"comparisonType": ComparisonType.PERCENT,
}

def setUp(self):
super().setUp()
self.job = WorkflowJob({"event": self.group_event})

def test_percent(self):
dc = self.create_data_condition(
type=self.condition,
Expand All @@ -92,8 +104,11 @@ def test_percent(self):
condition_result=True,
)

self.assert_slow_cond_passes(dc, [21, 10])
self.assert_slow_cond_does_not_pass(dc, [20, 10])
self.job["snuba_results"] = [21, 10]
self.assert_passes(dc, self.job)

self.job["snuba_results"] = [20, 10]
self.assert_does_not_pass(dc, self.job)

def test_dual_write_percent(self):
self.payload.update({"comparisonType": ComparisonType.PERCENT, "comparisonInterval": "1d"})
Expand Down
Loading

0 comments on commit 3caa22e

Please sign in to comment.