-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(aci): enqueue workflows for delayed processing #83548
Conversation
Codecov ReportAttention: Patch coverage is ✅ All tests successful. No failed tests found.
Additional details and impacted files@@ Coverage Diff @@
## master #83548 +/- ##
==========================================
+ Coverage 87.54% 87.60% +0.05%
==========================================
Files 9408 9489 +81
Lines 537825 539196 +1371
Branches 21176 21176
==========================================
+ Hits 470859 472370 +1511
+ Misses 66618 66478 -140
Partials 348 348 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the biggest change here is that we should not be filtering the workflows when we are trying to evaluate them. instead, we should filter before invoking evaluate. by filtering inside of the evaluation, it means we wouldn't be able to re-use this evaluation method in slow processing.
if random.random() < 0.01: | ||
logger.info( | ||
"process_workflows.workflow_enqueued", | ||
extra={"workflow": workflow.id, "group": event.group.id, "project": project_id}, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we audit the logging in here? I'm not sure we need a lot of these info logs anymore (this one for example is being sampled to 1% - which isn't super valuable)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mifu67 are the logs you've downsampled to 1% for enqueue rules for delayed processing still useful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
she just did that cause they were noisy, i think there are a number of info logs here that we need to audit.
I'd recommend only keeping things that make sense to you. if you have any questions lemme know :)
def evaluate_workflow_triggers( | ||
workflows: set[Workflow], job: WorkflowJob | ||
) -> tuple[set[Workflow], set[Workflow]]: | ||
triggered_workflows: set[Workflow] = set() | ||
workflows_to_enqueue: set[Workflow] = set() | ||
|
||
for workflow in workflows: | ||
if workflow.evaluate_trigger_conditions(job): | ||
triggered_workflows.add(workflow) | ||
else: | ||
if get_slow_conditions(workflow): | ||
# enqueue to be evaluated later | ||
workflows_to_enqueue.add(workflow) | ||
|
||
return triggered_workflows | ||
return triggered_workflows, workflows_to_enqueue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think we should be doing filtering or anything here - this method should be a pure method to evaluate the workflow triggers and that's it; if we want to filter the workflows being evaluated we should do that before evaluating them.
let's update the code to have process_workflows
figure out what is fast / slow conditions, then filter based on fast / slow conditions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we only enqueue workflows that need to have slow conditions evaluated because they don't pass after evaluating the fast conditions alone. are you saying to evaluate the workflows with slow conditions separately? some of them might be triggered immediately and some of them may have to be enqueued
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated to only return triggered_workflows
…ate slow conditions when the data is available
@@ -59,7 +59,7 @@ def get_result(model: TSDBModel, group_ids: list[int]) -> dict[int, int]: | |||
|
|||
|
|||
@condition_handler_registry.register(Condition.EVENT_FREQUENCY_COUNT) | |||
class EventFrequencyCountHandler(EventFrequencyConditionHandler, DataConditionHandler[int]): | |||
class EventFrequencyCountHandler(EventFrequencyConditionHandler, DataConditionHandler[WorkflowJob]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update this to evaluate WorkflowJob
so we can reuse evaluate_workflow_triggers
in delayed processing, and populate snuba_results
inside WorkflowJob
after we make the snuba queries
def evaluate_value(value: list[int], comparison: Any) -> DataConditionResult: | ||
if len(value) != 2: | ||
def evaluate_value(value: WorkflowJob, comparison: Any) -> DataConditionResult: | ||
if len(value.get("snuba_results", [])) != 2: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a common scenario or a weird snuba blip?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's possible we don't have the snuba results when we are evaluating the triggers outside of delayed processing
buffer.backend.push_to_sorted_set(key=WORKFLOW_ENGINE_BUFFER_LIST_KEY, value=project_id) | ||
|
||
if_dcgs = workflow_action_groups.get(workflow.id, []) | ||
if not if_dcgs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this reads a little strange - why is the var name if_dcgs
? could it just be dcgs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are IF data condition groups
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 maybe we call them workflow_action_filters
? (since that should be the type of DCG here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overal, lgtm. i think we can do a bit more cleanup here, but i don't think we need to block on that.
🙏 thanks for addressing the feedback!
buffer.backend.push_to_sorted_set(key=WORKFLOW_ENGINE_BUFFER_LIST_KEY, value=project_id) | ||
|
||
if_dcgs = workflow_action_groups.get(workflow.id, []) | ||
if not if_dcgs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 maybe we call them workflow_action_filters
? (since that should be the type of DCG here)
# enqueue to be evaluated later | ||
workflows_to_enqueue.add(workflow) | ||
|
||
enqueue_workflows(workflows_to_enqueue, job) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: save the couple of cpu cycles and only enqueue if we have something to enqueue
enqueue_workflows(workflows_to_enqueue, job) | |
if workflows_to_enqueue: | |
enqueue_workflows(workflows_to_enqueue, job) |
src/sentry/workflow_engine/types.py
Outdated
@@ -40,6 +40,7 @@ class WorkflowJob(EventJob, total=False): | |||
has_alert: bool | |||
has_escalated: bool | |||
workflow: Workflow | |||
snuba_results: list[int] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 is the value in this that we can re-use evaluate_workflows
method?
i think this is a bit of a smell that the abstraction might not be quite right in either delayed processing or the evaluate_workflow_triggers 🤔
mind adding a TODO here so i can come back and take a look? i'm not sure if this is the best approach, but seems okay for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah the value is the we can use at least the evaluate_workflow_triggers
function. we'll already have processed the actions we can possibly fire before enqueuing so all we need to do is process the slow conditions, but i'm also not sure if it's the best way to do it
Adds the following logic to account for delayed processing of slow conditions:
DataConditionGroup
(DCG), note the workflows that need to have their slow condition(s) checked before proceedingDataConditionGroupAction
, this is so we can only evaluate slow conditions in delayed processing.