Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aci): enqueue workflows for delayed processing #83548

Merged
merged 11 commits into from
Jan 21, 2025
3 changes: 3 additions & 0 deletions src/sentry/workflow_engine/handlers/condition/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
__all__ = [
"EventCreatedByDetectorConditionHandler",
"EventFrequencyCountHandler",
"EventFrequencyPercentHandler",
"EventSeenCountConditionHandler",
"EveryEventConditionHandler",
"ReappearedEventConditionHandler",
Expand All @@ -23,6 +25,7 @@
from .assigned_to_handler import AssignedToConditionHandler
from .event_attribute_handler import EventAttributeConditionHandler
from .event_created_by_detector_handler import EventCreatedByDetectorConditionHandler
from .event_frequency_handlers import EventFrequencyCountHandler, EventFrequencyPercentHandler
from .event_seen_count_handler import EventSeenCountConditionHandler
from .every_event_handler import EveryEventConditionHandler
from .existing_high_priority_issue_handler import ExistingHighPriorityIssueConditionHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.registry import condition_handler_registry
from sentry.workflow_engine.types import DataConditionHandler, DataConditionResult
from sentry.workflow_engine.types import DataConditionHandler, DataConditionResult, WorkflowJob


class EventFrequencyConditionHandler(BaseEventFrequencyConditionHandler):
Expand Down Expand Up @@ -59,7 +59,7 @@ def get_result(model: TSDBModel, group_ids: list[int]) -> dict[int, int]:


@condition_handler_registry.register(Condition.EVENT_FREQUENCY_COUNT)
class EventFrequencyCountHandler(EventFrequencyConditionHandler, DataConditionHandler[int]):
class EventFrequencyCountHandler(EventFrequencyConditionHandler, DataConditionHandler[WorkflowJob]):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update this to evaluate WorkflowJob so we can reuse evaluate_workflow_triggers in delayed processing, and populate snuba_results inside WorkflowJob after we make the snuba queries

comparison_json_schema = {
"type": "object",
"properties": {
Expand All @@ -71,12 +71,16 @@ class EventFrequencyCountHandler(EventFrequencyConditionHandler, DataConditionHa
}

@staticmethod
def evaluate_value(value: int, comparison: Any) -> DataConditionResult:
return value > comparison["value"]
def evaluate_value(value: WorkflowJob, comparison: Any) -> DataConditionResult:
if len(value.get("snuba_results", [])) != 1:
return False
return value["snuba_results"][0] > comparison["value"]


@condition_handler_registry.register(Condition.EVENT_FREQUENCY_PERCENT)
class EventFrequencyPercentHandler(EventFrequencyConditionHandler, DataConditionHandler[list[int]]):
class EventFrequencyPercentHandler(
EventFrequencyConditionHandler, DataConditionHandler[WorkflowJob]
):
comparison_json_schema = {
"type": "object",
"properties": {
Expand All @@ -89,7 +93,10 @@ class EventFrequencyPercentHandler(EventFrequencyConditionHandler, DataCondition
}

@staticmethod
def evaluate_value(value: list[int], comparison: Any) -> DataConditionResult:
if len(value) != 2:
def evaluate_value(value: WorkflowJob, comparison: Any) -> DataConditionResult:
if len(value.get("snuba_results", [])) != 2:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a common scenario or a weird snuba blip?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's possible we don't have the snuba results when we are evaluating the triggers outside of delayed processing

return False
return percent_increase(value[0], value[1]) > comparison["value"]
return (
percent_increase(value["snuba_results"][0], value["snuba_results"][1])
> comparison["value"]
)
24 changes: 12 additions & 12 deletions src/sentry/workflow_engine/models/data_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ class Condition(models.TextChoices):
Condition.NOT_EQUAL: operator.ne,
}

SLOW_CONDITIONS = [
Condition.EVENT_FREQUENCY_COUNT,
Condition.EVENT_FREQUENCY_PERCENT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_COUNT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_PERCENT,
Condition.PERCENT_SESSIONS_COUNT,
Condition.PERCENT_SESSIONS_PERCENT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_COUNT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_PERCENT,
]


T = TypeVar("T")


Expand Down Expand Up @@ -140,18 +152,6 @@ def evaluate_value(self, value: T) -> DataConditionResult:
return self.get_condition_result() if result else None


SLOW_CONDITIONS = [
Condition.EVENT_FREQUENCY_COUNT,
Condition.EVENT_FREQUENCY_PERCENT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_COUNT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_PERCENT,
Condition.PERCENT_SESSIONS_COUNT,
Condition.PERCENT_SESSIONS_PERCENT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_COUNT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_PERCENT,
]


def is_slow_condition(cond: DataCondition) -> bool:
return Condition(cond.type) in SLOW_CONDITIONS

Expand Down
13 changes: 13 additions & 0 deletions src/sentry/workflow_engine/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model, sane_repr
from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
from sentry.models.owner_base import OwnerModel
from sentry.workflow_engine.models.data_condition import DataCondition, is_slow_condition
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group
from sentry.workflow_engine.types import WorkflowJob

Expand Down Expand Up @@ -77,6 +78,18 @@ def evaluate_trigger_conditions(self, job: WorkflowJob) -> bool:
return evaluation


def get_slow_conditions(workflow: Workflow) -> list[DataCondition]:
if not workflow.when_condition_group:
return []

slow_conditions = [
condition
for condition in workflow.when_condition_group.conditions.all()
if is_slow_condition(condition)
]
return slow_conditions


@receiver(pre_save, sender=Workflow)
def enforce_config_schema(sender, instance: Workflow, **kwargs):
instance.validate_config(instance.config_schema)
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ def evaluate_condition_group(
results = []
conditions = get_data_conditions_for_group(data_condition_group.id)

# TODO - @saponifi3d
# Split the conditions into fast and slow conditions
# Evaluate the fast conditions first, if any are met, return early
# Enqueue the slow conditions to be evaluated later

if len(conditions) == 0:
# if we don't have any conditions, always return True
return True, []
Expand All @@ -54,12 +49,14 @@ def evaluate_condition_group(
if data_condition_group.logic_type == data_condition_group.Type.NONE:
# if we get to this point, no conditions were met
return True, []

elif data_condition_group.logic_type == data_condition_group.Type.ANY:
is_any_condition_met = any([result[0] for result in results])

if is_any_condition_met:
condition_results = [result[1] for result in results if result[0]]
return is_any_condition_met, condition_results

elif data_condition_group.logic_type == data_condition_group.Type.ALL:
conditions_met = [result[0] for result in results]
is_all_conditions_met = all(conditions_met)
Expand Down
64 changes: 62 additions & 2 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,82 @@
import logging
from collections import defaultdict

import sentry_sdk

from sentry.utils import metrics
from sentry.workflow_engine.models import Detector, Workflow
from sentry import buffer
from sentry.utils import json, metrics
from sentry.workflow_engine.models import Detector, Workflow, WorkflowDataConditionGroup
from sentry.workflow_engine.models.workflow import get_slow_conditions
from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group
from sentry.workflow_engine.processors.detector import get_detector_by_event
from sentry.workflow_engine.types import WorkflowJob

logger = logging.getLogger(__name__)

WORKFLOW_ENGINE_BUFFER_LIST_KEY = "workflow_engine_delayed_processing_buffer"


def get_data_condition_groups_to_fire(
workflows: set[Workflow], job: WorkflowJob
) -> dict[int, list[int]]:
workflow_action_groups: dict[int, list[int]] = defaultdict(list)

workflow_ids = {workflow.id for workflow in workflows}

workflow_dcgs = WorkflowDataConditionGroup.objects.filter(
workflow_id__in=workflow_ids
).select_related("condition_group", "workflow")

for workflow_dcg in workflow_dcgs:
action_condition = workflow_dcg.condition_group
evaluation, result = evaluate_condition_group(action_condition, job)

if evaluation:
workflow_action_groups[workflow_dcg.workflow_id].append(action_condition.id)

return workflow_action_groups


def enqueue_workflows(
workflows: set[Workflow],
job: WorkflowJob,
) -> None:
event = job["event"]
project_id = event.group.project.id
workflow_action_groups = get_data_condition_groups_to_fire(workflows, job)

for workflow in workflows:
buffer.backend.push_to_sorted_set(key=WORKFLOW_ENGINE_BUFFER_LIST_KEY, value=project_id)

if_dcgs = workflow_action_groups.get(workflow.id, [])
if not if_dcgs:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this reads a little strange - why is the var name if_dcgs? could it just be dcgs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are IF data condition groups

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 maybe we call them workflow_action_filters? (since that should be the type of DCG here)

continue

if_dcg_fields = ":".join(map(str, if_dcgs))

value = json.dumps({"event_id": event.event_id, "occurrence_id": event.occurrence_id})
buffer.backend.push_to_hash(
model=Workflow,
filters={"project": project_id},
field=f"{workflow.id}:{event.group.id}:{if_dcg_fields}",
value=value,
)


def evaluate_workflow_triggers(workflows: set[Workflow], job: WorkflowJob) -> set[Workflow]:
triggered_workflows: set[Workflow] = set()
workflows_to_enqueue: set[Workflow] = set()

for workflow in workflows:
if workflow.evaluate_trigger_conditions(job):
triggered_workflows.add(workflow)
else:
if get_slow_conditions(workflow):
# enqueue to be evaluated later
workflows_to_enqueue.add(workflow)

enqueue_workflows(workflows_to_enqueue, job)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: save the couple of cpu cycles and only enqueue if we have something to enqueue

Suggested change
enqueue_workflows(workflows_to_enqueue, job)
if workflows_to_enqueue:
enqueue_workflows(workflows_to_enqueue, job)


return triggered_workflows

Expand Down
1 change: 1 addition & 0 deletions src/sentry/workflow_engine/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class WorkflowJob(EventJob, total=False):
has_alert: bool
has_escalated: bool
workflow: Workflow
snuba_results: list[int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 is the value in this that we can re-use evaluate_workflows method?

i think this is a bit of a smell that the abstraction might not be quite right in either delayed processing or the evaluate_workflow_triggers 🤔

mind adding a TODO here so i can come back and take a look? i'm not sure if this is the best approach, but seems okay for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah the value is the we can use at least the evaluate_workflow_triggers function. we'll already have processed the actions we can possibly fire before enqueuing so all we need to do is process the slow conditions, but i'm also not sure if it's the best way to do it



class ActionHandler:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,6 @@ def assert_passes(self, data_condition: DataCondition, job: WorkflowJob) -> None
def assert_does_not_pass(self, data_condition: DataCondition, job: WorkflowJob) -> None:
assert data_condition.evaluate_value(job) != data_condition.get_condition_result()

# Slow conditions are evaluated in delayed processing and take in the results directly
def assert_slow_cond_passes(self, data_condition: DataCondition, value: Any) -> None:
assert data_condition.evaluate_value(value) == data_condition.get_condition_result()

def assert_slow_cond_does_not_pass(self, data_condition: DataCondition, value: Any) -> None:
assert data_condition.evaluate_value(value) != data_condition.get_condition_result()

# TODO: activity


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
EventFrequencyCountHandler,
)
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.types import WorkflowJob
from tests.sentry.workflow_engine.handlers.condition.test_base import (
ConditionTestCase,
EventFrequencyQueryTestBase,
Expand All @@ -27,15 +28,22 @@ class TestEventFrequencyCountCondition(ConditionTestCase):
"comparisonType": ComparisonType.COUNT,
}

def setUp(self):
super().setUp()
self.job = WorkflowJob({"event": self.group_event})

def test_count(self):
dc = self.create_data_condition(
type=self.condition,
comparison={"interval": "1h", "value": 1000},
condition_result=True,
)

self.assert_slow_cond_passes(dc, 1001)
self.assert_slow_cond_does_not_pass(dc, 999)
self.job["snuba_results"] = [1001]
self.assert_passes(dc, self.job)

self.job["snuba_results"] = [999]
self.assert_does_not_pass(dc, self.job)

def test_dual_write_count(self):
dcg = self.create_data_condition_group()
Expand Down Expand Up @@ -81,6 +89,10 @@ class TestEventFrequencyPercentCondition(ConditionTestCase):
"comparisonType": ComparisonType.PERCENT,
}

def setUp(self):
super().setUp()
self.job = WorkflowJob({"event": self.group_event})

def test_percent(self):
dc = self.create_data_condition(
type=self.condition,
Expand All @@ -92,8 +104,11 @@ def test_percent(self):
condition_result=True,
)

self.assert_slow_cond_passes(dc, [21, 10])
self.assert_slow_cond_does_not_pass(dc, [20, 10])
self.job["snuba_results"] = [21, 10]
self.assert_passes(dc, self.job)

self.job["snuba_results"] = [20, 10]
self.assert_does_not_pass(dc, self.job)

def test_dual_write_percent(self):
self.payload.update({"comparisonType": ComparisonType.PERCENT, "comparisonInterval": "1d"})
Expand Down
Loading
Loading