Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aci): enqueue workflows for delayed processing #83548

Merged
merged 11 commits into from
Jan 21, 2025
3 changes: 3 additions & 0 deletions src/sentry/workflow_engine/handlers/condition/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
__all__ = [
"EventCreatedByDetectorConditionHandler",
"EventFrequencyCountHandler",
"EventFrequencyPercentHandler",
"EventSeenCountConditionHandler",
"EveryEventConditionHandler",
"ReappearedEventConditionHandler",
Expand All @@ -23,6 +25,7 @@
from .assigned_to_handler import AssignedToConditionHandler
from .event_attribute_handler import EventAttributeConditionHandler
from .event_created_by_detector_handler import EventCreatedByDetectorConditionHandler
from .event_frequency_handlers import EventFrequencyCountHandler, EventFrequencyPercentHandler
from .event_seen_count_handler import EventSeenCountConditionHandler
from .every_event_handler import EveryEventConditionHandler
from .existing_high_priority_issue_handler import ExistingHighPriorityIssueConditionHandler
Expand Down
37 changes: 25 additions & 12 deletions src/sentry/workflow_engine/models/data_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ class Condition(models.TextChoices):
Condition.NOT_EQUAL: operator.ne,
}

SLOW_CONDITIONS = [
Condition.EVENT_FREQUENCY_COUNT,
Condition.EVENT_FREQUENCY_PERCENT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_COUNT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_PERCENT,
Condition.PERCENT_SESSIONS_COUNT,
Condition.PERCENT_SESSIONS_PERCENT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_COUNT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_PERCENT,
]


T = TypeVar("T")


Expand Down Expand Up @@ -140,22 +152,23 @@ def evaluate_value(self, value: T) -> DataConditionResult:
return self.get_condition_result() if result else None


SLOW_CONDITIONS = [
Condition.EVENT_FREQUENCY_COUNT,
Condition.EVENT_FREQUENCY_PERCENT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_COUNT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_PERCENT,
Condition.PERCENT_SESSIONS_COUNT,
Condition.PERCENT_SESSIONS_PERCENT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_COUNT,
Condition.EVENT_UNIQUE_USER_FREQUENCY_WITH_CONDITIONS_PERCENT,
]


def is_slow_condition(cond: DataCondition) -> bool:
return Condition(cond.type) in SLOW_CONDITIONS


def split_fast_slow_conditions(
conditions: list[DataCondition],
) -> tuple[list[DataCondition], list[DataCondition]]:
fast_conditions = []
slow_conditions = []
for condition in conditions:
if is_slow_condition(condition):
slow_conditions.append(condition)
else:
fast_conditions.append(condition)
return fast_conditions, slow_conditions


@receiver(pre_save, sender=DataCondition)
def enforce_comparison_schema(sender, instance: DataCondition, **kwargs):

Expand Down
13 changes: 13 additions & 0 deletions src/sentry/workflow_engine/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model, sane_repr
from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
from sentry.models.owner_base import OwnerModel
from sentry.workflow_engine.models.data_condition import DataCondition, is_slow_condition
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group
from sentry.workflow_engine.types import WorkflowJob

Expand Down Expand Up @@ -77,6 +78,18 @@ def evaluate_trigger_conditions(self, job: WorkflowJob) -> bool:
return evaluation


def get_slow_conditions(workflow: Workflow) -> list[DataCondition]:
if not workflow.when_condition_group:
return []

slow_conditions = [
condition
for condition in workflow.when_condition_group.conditions.all()
if is_slow_condition(condition)
]
return slow_conditions


@receiver(pre_save, sender=Workflow)
def enforce_config_schema(sender, instance: Workflow, **kwargs):
instance.validate_config(instance.config_schema)
20 changes: 11 additions & 9 deletions src/sentry/workflow_engine/processors/data_condition_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from sentry.utils.function_cache import cache_func_for_models
from sentry.workflow_engine.models import DataCondition, DataConditionGroup
from sentry.workflow_engine.models.data_condition import split_fast_slow_conditions
from sentry.workflow_engine.types import ProcessedDataConditionResult

logger = logging.getLogger(__name__)
Expand All @@ -28,16 +29,14 @@ def evaluate_condition_group(
results = []
conditions = get_data_conditions_for_group(data_condition_group.id)

# TODO - @saponifi3d
# Split the conditions into fast and slow conditions
# Evaluate the fast conditions first, if any are met, return early
# Enqueue the slow conditions to be evaluated later

if len(conditions) == 0:
# if we don't have any conditions, always return True
return True, []

for condition in conditions:
fast_conditions, slow_conditions = split_fast_slow_conditions(conditions)
has_slow_conditions = bool(slow_conditions)

for condition in fast_conditions:
evaluation_result = condition.evaluate_value(value)
is_condition_triggered = evaluation_result is not None

Expand All @@ -52,21 +51,24 @@ def evaluate_condition_group(
results.append((is_condition_triggered, evaluation_result))

if data_condition_group.logic_type == data_condition_group.Type.NONE:
# if we get to this point, no conditions were met
return True, []
# if we get to this point + we don't have slow conditions, return True
return not has_slow_conditions, []

elif data_condition_group.logic_type == data_condition_group.Type.ANY:
is_any_condition_met = any([result[0] for result in results])

if is_any_condition_met:
condition_results = [result[1] for result in results if result[0]]
return is_any_condition_met, condition_results

elif data_condition_group.logic_type == data_condition_group.Type.ALL:
conditions_met = [result[0] for result in results]
is_all_conditions_met = all(conditions_met)

if is_all_conditions_met:
condition_results = [result[1] for result in results if result[0]]
return is_all_conditions_met, condition_results
# if all conditions are met so far + we don't have slow conditions, return True
return not has_slow_conditions, condition_results

return False, []

Expand Down
80 changes: 75 additions & 5 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,92 @@
import logging
import random
from collections import defaultdict

import sentry_sdk

from sentry.utils import metrics
from sentry.workflow_engine.models import Detector, Workflow
from sentry import buffer
from sentry.models.project import Project
from sentry.utils import json, metrics
from sentry.workflow_engine.models import Detector, Workflow, WorkflowDataConditionGroup
from sentry.workflow_engine.models.workflow import get_slow_conditions
from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group
from sentry.workflow_engine.processors.detector import get_detector_by_event
from sentry.workflow_engine.types import WorkflowJob

logger = logging.getLogger(__name__)

WORKFLOW_ENGINE_PROJECT_ID_BUFFER_LIST_KEY = "workflow_engine_project_id_buffer_list"
cathteng marked this conversation as resolved.
Show resolved Hide resolved

def evaluate_workflow_triggers(workflows: set[Workflow], job: WorkflowJob) -> set[Workflow]:

def get_data_condition_groups_to_fire(workflows: set[Workflow], job) -> dict[int, list[int]]:
cathteng marked this conversation as resolved.
Show resolved Hide resolved
workflow_action_groups: dict[int, list[int]] = defaultdict(list)

workflow_ids = {workflow.id for workflow in workflows}

workflow_dcgs = WorkflowDataConditionGroup.objects.filter(
workflow_id__in=workflow_ids
).select_related("condition_group", "workflow")

for workflow_dcg in workflow_dcgs:
action_condition = workflow_dcg.condition_group
evaluation, result = evaluate_condition_group(action_condition, job)

if evaluation:
workflow_action_groups[workflow_dcg.workflow_id].append(action_condition.id)

return workflow_action_groups


def enqueue_workflows(
workflows: set[Workflow],
job: WorkflowJob,
) -> None:
event = job["event"]
project_id = event.group.project.id
workflow_action_groups = get_data_condition_groups_to_fire(workflows, job)

for workflow in workflows:
if random.random() < 0.01:
logger.info(
"process_workflows.workflow_enqueued",
extra={"workflow": workflow.id, "group": event.group.id, "project": project_id},
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we audit the logging in here? I'm not sure we need a lot of these info logs anymore (this one for example is being sampled to 1% - which isn't super valuable)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mifu67 are the logs you've downsampled to 1% for enqueue rules for delayed processing still useful?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

she just did that cause they were noisy, i think there are a number of info logs here that we need to audit.

I'd recommend only keeping things that make sense to you. if you have any questions lemme know :)

buffer.backend.push_to_sorted_set(
key=WORKFLOW_ENGINE_PROJECT_ID_BUFFER_LIST_KEY, value=project_id
)

if_dcgs = workflow_action_groups.get(workflow.id, [])
if not if_dcgs:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this reads a little strange - why is the var name if_dcgs? could it just be dcgs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are IF data condition groups

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 maybe we call them workflow_action_filters? (since that should be the type of DCG here)

continue

if_dcg_fields = ":".join(map(str, if_dcgs))

value = json.dumps({"event_id": event.event_id, "occurrence_id": event.occurrence_id})
buffer.backend.push_to_hash(
model=Project,
filters={"project": project_id},
field=f"{workflow.id}:{event.group.id}:{if_dcg_fields}",
value=value,
)
cathteng marked this conversation as resolved.
Show resolved Hide resolved
metrics.incr("delayed_workflow.group_added")
cathteng marked this conversation as resolved.
Show resolved Hide resolved


def evaluate_workflow_triggers(
workflows: set[Workflow], job: WorkflowJob
) -> tuple[set[Workflow], set[Workflow]]:
triggered_workflows: set[Workflow] = set()
workflows_to_enqueue: set[Workflow] = set()

for workflow in workflows:
if workflow.evaluate_trigger_conditions(job):
triggered_workflows.add(workflow)
else:
if get_slow_conditions(workflow):
# enqueue to be evaluated later
workflows_to_enqueue.add(workflow)

return triggered_workflows
return triggered_workflows, workflows_to_enqueue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think we should be doing filtering or anything here - this method should be a pure method to evaluate the workflow triggers and that's it; if we want to filter the workflows being evaluated we should do that before evaluating them.

let's update the code to have process_workflows figure out what is fast / slow conditions, then filter based on fast / slow conditions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only enqueue workflows that need to have slow conditions evaluated because they don't pass after evaluating the fast conditions alone. are you saying to evaluate the workflows with slow conditions separately? some of them might be triggered immediately and some of them may have to be enqueued

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to only return triggered_workflows



def process_workflows(job: WorkflowJob) -> set[Workflow]:
Expand All @@ -39,7 +107,9 @@ def process_workflows(job: WorkflowJob) -> set[Workflow]:

# Get the workflows, evaluate the when_condition_group, finally evaluate the actions for workflows that are triggered
workflows = set(Workflow.objects.filter(detectorworkflow__detector_id=detector.id).distinct())
triggered_workflows = evaluate_workflow_triggers(workflows, job)
triggered_workflows, workflows_to_enqueue = evaluate_workflow_triggers(workflows, job)
enqueue_workflows(workflows_to_enqueue, job)

cathteng marked this conversation as resolved.
Show resolved Hide resolved
actions = evaluate_workflow_action_filters(triggered_workflows, job)

with sentry_sdk.start_span(op="workflow_engine.process_workflows.trigger_actions"):
Expand Down
Loading
Loading