From afe88ab17c0fa3ec21c46f51ae1ab442b971d923 Mon Sep 17 00:00:00 2001 From: getsentry-bot Date: Tue, 4 Jun 2024 17:04:57 +0000 Subject: [PATCH] Revert "ref: Remove LPQ scanning (#71908)" This reverts commit aa4745a13515dba4e98b95d311b7bc1b99b177e1. Co-authored-by: asottile-sentry <103459774+asottile-sentry@users.noreply.github.com> --- .github/CODEOWNERS | 2 + src/sentry/api/serializers/models/project.py | 7 +- src/sentry/conf/server.py | 1 + .../processing/realtime_metrics/__init__.py | 5 + .../processing/realtime_metrics/base.py | 59 ++++ .../processing/realtime_metrics/dummy.py | 16 + src/sentry/processing/realtime_metrics/pb.py | 16 + .../processing/realtime_metrics/redis.py | 145 ++++++-- .../tasks/low_priority_symbolication.py | 153 ++++++++ .../processing/realtime_metrics/test_redis.py | 327 ++++++++++++++++-- .../tasks/test_low_priority_symbolication.py | 152 ++++++++ 11 files changed, 827 insertions(+), 56 deletions(-) create mode 100644 src/sentry/tasks/low_priority_symbolication.py create mode 100644 tests/sentry/tasks/test_low_priority_symbolication.py diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index d1ecb77782b40c..0e21cb024fca12 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -405,8 +405,10 @@ static/app/components/events/eventStatisticalDetector/ @getse /src/sentry/processing/realtime_metrics/ @getsentry/owners-native /src/sentry/tasks/app_store_connect.py @getsentry/owners-native /src/sentry/tasks/assemble.py @getsentry/owners-native +/src/sentry/tasks/low_priority_symbolication.py @getsentry/owners-native /src/sentry/tasks/symbolication.py @getsentry/owners-native /src/sentry/utils/appleconnect/ @getsentry/owners-native +/tests/sentry/tasks/test_low_priority_symbolication.py @getsentry/owners-native /src/sentry/tasks/reprocessing.py @getsentry/processing /src/sentry/tasks/reprocessing2.py @getsentry/processing /src/sentry/reprocessing2.py @getsentry/processing diff --git a/src/sentry/api/serializers/models/project.py b/src/sentry/api/serializers/models/project.py index f98656bba8e86a..ba13c3a0623dd1 100644 --- a/src/sentry/api/serializers/models/project.py +++ b/src/sentry/api/serializers/models/project.py @@ -41,6 +41,7 @@ from sentry.processing import realtime_metrics from sentry.roles import organization_roles from sentry.snuba import discover +from sentry.tasks.symbolication import should_demote_symbolication STATUS_LABELS = { ObjectStatus.ACTIVE: "active", @@ -678,14 +679,16 @@ def get_attrs( if not self._collapse(LATEST_DEPLOYS_KEY): deploys_by_project = self.get_deploys_by_project(item_list) + with sentry_sdk.start_span(op="project_summary_serializer.get_lpq_projects"): + lpq_projects = realtime_metrics.get_lpq_projects() for item in item_list: attrs[item]["latest_release"] = latest_release_versions.get(item.id) attrs[item]["environments"] = environments_by_project.get(item.id, []) attrs[item]["has_user_reports"] = item.id in projects_with_user_reports if not self._collapse(LATEST_DEPLOYS_KEY): attrs[item]["deploys"] = deploys_by_project.get(item.id) - attrs[item]["symbolication_degraded"] = realtime_metrics.is_lpq_project( - project_id=item.id + attrs[item]["symbolication_degraded"] = should_demote_symbolication( + project_id=item.id, lpq_projects=lpq_projects, emit_metrics=False ) return attrs diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 5ea2d522c5cd24..51d184c5568a21 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -760,6 +760,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str: "sentry.tasks.files", "sentry.tasks.groupowner", "sentry.tasks.integrations", + "sentry.tasks.low_priority_symbolication", "sentry.tasks.merge", "sentry.tasks.options", "sentry.tasks.ping", diff --git a/src/sentry/processing/realtime_metrics/__init__.py b/src/sentry/processing/realtime_metrics/__init__.py index a72e4fdfed7cb6..27b2d5980eb43b 100644 --- a/src/sentry/processing/realtime_metrics/__init__.py +++ b/src/sentry/processing/realtime_metrics/__init__.py @@ -19,4 +19,9 @@ __realtime_metrics_store__ = RealtimeMetricsStore() validate = __realtime_metrics_store__.validate record_project_duration = __realtime_metrics_store__.record_project_duration + projects = __realtime_metrics_store__.projects + get_used_budget_for_project = __realtime_metrics_store__.get_used_budget_for_project + get_lpq_projects = __realtime_metrics_store__.get_lpq_projects is_lpq_project = __realtime_metrics_store__.is_lpq_project + add_project_to_lpq = __realtime_metrics_store__.add_project_to_lpq + remove_projects_from_lpq = __realtime_metrics_store__.remove_projects_from_lpq diff --git a/src/sentry/processing/realtime_metrics/base.py b/src/sentry/processing/realtime_metrics/base.py index f9086d1c660865..f9920ef1fffb41 100644 --- a/src/sentry/processing/realtime_metrics/base.py +++ b/src/sentry/processing/realtime_metrics/base.py @@ -1,3 +1,5 @@ +from collections.abc import Iterable + from sentry.utils.services import Service @@ -7,7 +9,12 @@ class RealtimeMetricsStore(Service): __all__ = ( "validate", "record_project_duration", + "projects", + "get_used_budget_for_project", + "get_lpq_projects", "is_lpq_project", + "add_project_to_lpq", + "remove_projects_from_lpq", ) def validate(self) -> None: @@ -46,8 +53,60 @@ def record_project_duration(self, project_id: int, duration: float) -> None: """ raise NotImplementedError + def projects(self) -> Iterable[int]: + """ + Returns IDs of all projects that should be considered for the low priority queue. + """ + raise NotImplementedError + + def get_used_budget_for_project(self, project_id: int) -> float: + """ + Returns the average used per-second budget for a given project during the configured sliding time window. + """ + raise NotImplementedError + + def get_lpq_projects(self) -> set[int]: + """ + Fetches the list of projects that are currently using the low priority queue. + + Returns a list of project IDs. + """ + raise NotImplementedError + def is_lpq_project(self, project_id: int) -> bool: """ Checks whether the given project is currently using the low priority queue. """ raise NotImplementedError + + def add_project_to_lpq(self, project_id: int) -> bool: + """ + Assigns a project to the low priority queue. + + This registers an intent to redirect all symbolication events triggered by the specified + project to be redirected to the low priority queue. + + Applies a backoff timer to the project which prevents it from being automatically evicted + from the queue while that timer is active. + + Returns True if the project was a new addition to the list. Returns False if it was already + assigned to the low priority queue. This may throw an exception if there is some sort of + issue registering the project with the queue. + """ + raise NotImplementedError + + def remove_projects_from_lpq(self, project_ids: set[int]) -> int: + """ + Unassigns projects from the low priority queue. + + This registers an intent to restore all specified projects back to the regular queue. + + Applies a backoff timer to the project which prevents it from being automatically assigned + to the queue while that timer is active. + + Returns the number of projects that were actively removed from the queue. Any projects that + were not assigned to the low priority queue to begin with will be omitted from the return + value. This may throw an exception if there is some sort of issue deregistering the projects + from the queue. + """ + raise NotImplementedError diff --git a/src/sentry/processing/realtime_metrics/dummy.py b/src/sentry/processing/realtime_metrics/dummy.py index 2337cbf3155e1b..1f09359d10bc67 100644 --- a/src/sentry/processing/realtime_metrics/dummy.py +++ b/src/sentry/processing/realtime_metrics/dummy.py @@ -1,4 +1,5 @@ import logging +from collections.abc import Iterable from typing import Any from . import base @@ -16,5 +17,20 @@ def validate(self) -> None: def record_project_duration(self, project_id: int, duration: float) -> None: pass + def projects(self) -> Iterable[int]: + yield from () + + def get_used_budget_for_project(self, project_id: int) -> float: + return 0.0 + + def get_lpq_projects(self) -> set[int]: + return set() + def is_lpq_project(self, project_id: int) -> bool: return False + + def add_project_to_lpq(self, project_id: int) -> bool: + return False + + def remove_projects_from_lpq(self, project_ids: set[int]) -> int: + return 0 diff --git a/src/sentry/processing/realtime_metrics/pb.py b/src/sentry/processing/realtime_metrics/pb.py index 9da4cea0537904..4439a6de1bc551 100644 --- a/src/sentry/processing/realtime_metrics/pb.py +++ b/src/sentry/processing/realtime_metrics/pb.py @@ -1,4 +1,5 @@ import logging +from collections.abc import Iterable from urllib.parse import urljoin from requests import RequestException @@ -57,3 +58,18 @@ def is_lpq_project(self, project_id: int) -> bool: def validate(self) -> None: pass + + def projects(self) -> Iterable[int]: + yield from () + + def get_used_budget_for_project(self, project_id: int) -> float: + return 0.0 + + def get_lpq_projects(self) -> set[int]: + return set() + + def add_project_to_lpq(self, project_id: int) -> bool: + return False + + def remove_projects_from_lpq(self, project_ids: set[int]) -> int: + return 0 diff --git a/src/sentry/processing/realtime_metrics/redis.py b/src/sentry/processing/realtime_metrics/redis.py index de1a7d1739bc02..ec3469f07ede1e 100644 --- a/src/sentry/processing/realtime_metrics/redis.py +++ b/src/sentry/processing/realtime_metrics/redis.py @@ -1,19 +1,16 @@ import logging +from collections.abc import Iterable, Sequence from time import time -from django.conf import settings - from sentry.exceptions import InvalidConfiguration from sentry.utils import redis from . import base -logger = logging.getLogger(__name__) +# redis key for entry storing current list of LPQ members +LPQ_MEMBERS_KEY = "store.symbolicate-event-lpq-selected" -# Redis key prefix for storing the LPQ status of projects -MEMBER_KEY_PREFIX = "symbolicate_event_low_priority:members" -# Redis key prefix for storing the budgets of projects -BUDGET_KEY_PREFIX = "symbolicate_event_low_priority:budget" +logger = logging.getLogger(__name__) class RedisRealtimeMetricsStore(base.RealtimeMetricsStore): @@ -37,6 +34,7 @@ def __init__( self.cluster = redis.redis_clusters.get(cluster) self._budget_bucket_size = budget_bucket_size self._budget_time_window = budget_time_window + self._prefix = "symbolicate_event_low_priority" self._backoff_timer = backoff_timer self.validate() @@ -48,11 +46,31 @@ def validate(self) -> None: if self._budget_time_window < 60: raise InvalidConfiguration("budget time window must be at least a minute") - if self._backoff_timer < 1: - raise InvalidConfiguration("backoff timer must be at least a second") - def _budget_key_prefix(self) -> str: - return f"{BUDGET_KEY_PREFIX}:{self._budget_bucket_size}" + return f"{self._prefix}:budget:{self._budget_bucket_size}" + + def _backoff_key_prefix(self) -> str: + return f"{self._prefix}:backoff" + + def _register_backoffs(self, project_ids: Sequence[int]) -> None: + if len(project_ids) == 0 or self._backoff_timer == 0: + return + + with self.cluster.pipeline(transaction=False) as pipeline: + for project_id in project_ids: + key = f"{self._backoff_key_prefix()}:{project_id}" + # Can't use mset because it doesn't allow also specifying an expiry + pipeline.set(name=key, value="1", ex=self._backoff_timer) + + pipeline.execute() + + def _is_backing_off(self, project_id: int) -> bool: + """ + Returns whether a project is currently in the middle of its backoff timer from having + recently been assigned to or unassigned from the LPQ. + """ + key = f"{self._backoff_key_prefix()}:{project_id}" + return self.cluster.get(key) is not None def record_project_duration(self, project_id: int, duration: float) -> None: """ @@ -96,9 +114,31 @@ def record_project_duration(self, project_id: int, duration: float) -> None: pipeline.expire(key, self._budget_time_window + self._budget_bucket_size) pipeline.execute() - def is_lpq_project(self, project_id: int) -> bool: + def projects(self) -> Iterable[int]: """ - Checks whether the given project is currently using the low priority queue. + Returns IDs of all projects for which metrics have been recorded in the store. + + This may throw an exception if there is some sort of issue scanning the redis store for + projects. + """ + + already_seen = set() + all_keys = self.cluster.scan_iter( + match=self._budget_key_prefix() + ":*", + ) + + for item in all_keys: + # Because this could be one of two patterns, this splits based on the most basic + # delimiter ":" instead of splitting on known prefixes + _prefix, _metric_type, _bucket_size, project_id_raw, _else = item.split(":", maxsplit=4) + project_id = int(project_id_raw) + if project_id not in already_seen: + already_seen.add(project_id) + yield project_id + + def get_used_budget_for_project(self, project_id: int) -> float: + """ + Returns the average used per-second budget for a given project during the configured sliding time window. """ timestamp = int(time()) @@ -110,23 +150,74 @@ def is_lpq_project(self, project_id: int) -> bool: buckets = range(first_bucket, now_bucket + bucket_size, bucket_size) keys = [f"{self._budget_key_prefix()}:{project_id}:{ts}" for ts in buckets] - member_key = f"{MEMBER_KEY_PREFIX}:{project_id}" - keys.insert(0, member_key) - results = self.cluster.mget(keys) - is_lpq = results[0] - counts = results[1:] - - if is_lpq is not None: - return True + counts = self.cluster.mget(keys) total_time_window = timestamp - first_bucket total_sum = sum(int(c) if c else 0 for c in counts) # the counts in redis are in ms resolution. - average_used = total_sum / total_time_window / 1000 - budget = settings.SENTRY_LPQ_OPTIONS["project_budget"] - new_is_lpq = average_used > budget + return total_sum / total_time_window / 1000 + + def get_lpq_projects(self) -> set[int]: + """ + Fetches the list of projects that are currently using the low priority queue. + + Returns a list of project IDs. + + This may throw an exception if there is some sort of issue fetching the list from the redis + store. + """ + return {int(project_id) for project_id in self.cluster.smembers(LPQ_MEMBERS_KEY)} + + def is_lpq_project(self, project_id: int) -> bool: + """ + Checks whether the given project is currently using the low priority queue. + """ + return bool(self.cluster.sismember(LPQ_MEMBERS_KEY, project_id)) + + def add_project_to_lpq(self, project_id: int) -> bool: + """ + Assigns a project to the low priority queue. + + This registers an intent to redirect all symbolication events triggered by the specified + project to be redirected to the low priority queue. + + Applies a backoff timer to the project which prevents it from being automatically evicted + from the queue while that timer is active. + + Returns True if the project was a new addition to the list. Returns False if it was already + assigned to the low priority queue. This may throw an exception if there is some sort of + issue registering the project with the queue. + """ + + if self._is_backing_off(project_id): + return False + # If this successfully completes then the project is expected to be in the set. + was_added = int(self.cluster.sadd(LPQ_MEMBERS_KEY, project_id)) > 0 + self._register_backoffs([project_id]) + return was_added + + def remove_projects_from_lpq(self, project_ids: set[int]) -> int: + """ + Unassigns projects from the low priority queue. + + This registers an intent to restore all specified projects back to the regular queue. + + Applies a backoff timer to the project which prevents it from being automatically assigned + to the queue while that timer is active. + + Returns the number of projects that were actively removed from the queue. Any projects that + were not assigned to the low priority queue to begin with will be omitted from the return + value. This may throw an exception if there is some sort of issue deregistering the projects + from the queue. + """ + removable = [project for project in project_ids if not self._is_backing_off(project)] + + if not removable: + return 0 - if new_is_lpq: - self.cluster.set(name=member_key, value="1", ex=self._backoff_timer) - return new_is_lpq + # This returns the number of projects removed, and throws an exception if there's a problem. + # If this successfully completes then the projects are expected to no longer be in the set. + removed = int(self.cluster.srem(LPQ_MEMBERS_KEY, *removable)) + self._register_backoffs(removable) + return removed diff --git a/src/sentry/tasks/low_priority_symbolication.py b/src/sentry/tasks/low_priority_symbolication.py new file mode 100644 index 00000000000000..29f4c1fd414a55 --- /dev/null +++ b/src/sentry/tasks/low_priority_symbolication.py @@ -0,0 +1,153 @@ +""" +Tasks that automate the job of moving projects in and out of symbolicator's low priority queue based +on symbolication metrics stored in Redis. + +This has three major tasks, executed in the following general order: +1. Scan for new suspect projects in Redis that need to be checked for LPQ eligibility. Triggers 2 and 3. +2. Determine a project's eligibility for the LPQ based on their recorded metrics. +3. Remove some specified project from the LPQ. +""" + +import logging +from typing import Literal + +import sentry_sdk +from django.conf import settings + +from sentry import options +from sentry.killswitches import normalize_value +from sentry.processing import realtime_metrics +from sentry.tasks.base import instrumented_task +from sentry.utils import metrics + +logger = logging.getLogger(__name__) + + +@instrumented_task( + name="sentry.tasks.low_priority_symbolication.scan_for_suspect_projects", + queue="symbolications.compute_low_priority_projects", + ignore_result=True, + soft_time_limit=10, +) +def scan_for_suspect_projects() -> None: + """Scans and updates the list of projects assigned to the low priority queue.""" + try: + _scan_for_suspect_projects() + finally: + _record_metrics() + + +def _scan_for_suspect_projects() -> None: + suspect_projects = set() + + for project_id in realtime_metrics.projects(): + suspect_projects.add(project_id) + update_lpq_eligibility.delay(project_id=project_id) + + # Prune projects we definitely know shouldn't be in the queue any more. + # `update_lpq_eligibility` should handle removing suspect projects from the list if it turns + # out they need to be evicted. + current_lpq_projects = realtime_metrics.get_lpq_projects() or set() + expired_projects = current_lpq_projects.difference(suspect_projects) + if not expired_projects: + return + + realtime_metrics.remove_projects_from_lpq(expired_projects) + + for project_id in expired_projects: + _report_change( + project_id=project_id, change="removed", reason="no metrics", used_budget=0.0 + ) + + +@instrumented_task( + name="sentry.tasks.low_priority_symbolication.update_lpq_eligibility", + queue="symbolications.compute_low_priority_projects", + ignore_result=True, + soft_time_limit=10, +) +def update_lpq_eligibility(project_id: int) -> None: + """ + Given a project ID, determines whether the project belongs in the low priority queue and + removes or assigns it accordingly to the low priority queue. + """ + _update_lpq_eligibility(project_id) + + +def _update_lpq_eligibility(project_id: int) -> None: + used_budget = realtime_metrics.get_used_budget_for_project(project_id) + + # NOTE: tagging this metrics with `tags={"project_id": project_id}` would + # have too excessive cardinality to use in production. + metrics.distribution("symbolication.lpq.computation.used_budget", used_budget) + + options = settings.SENTRY_LPQ_OPTIONS + exceeds_budget = used_budget > options["project_budget"] + + if exceeds_budget: + was_added = realtime_metrics.add_project_to_lpq(project_id) + if was_added: + _report_change( + project_id=project_id, change="added", reason="budget", used_budget=used_budget + ) + else: + was_removed = realtime_metrics.remove_projects_from_lpq({project_id}) + if was_removed: + _report_change( + project_id=project_id, + change="removed", + reason="ineligible", + used_budget=used_budget, + ) + + +def _report_change( + project_id: int, change: Literal["added", "removed"], reason: str, used_budget: float +) -> None: + if not reason: + reason = "unknown" + + if change == "added": + message = "Added project to symbolicator's low priority queue" + else: + message = "Removed project from symbolicator's low priority queue" + + with sentry_sdk.push_scope() as scope: + scope.set_level("warning") + scope.set_tag("project", project_id) + scope.set_tag("lpq_reason", reason) + scope.set_extra("used_budget", used_budget) + sentry_sdk.capture_message(message) + + +def _record_metrics() -> None: + project_count = len(realtime_metrics.get_lpq_projects()) + metrics.gauge( + "tasks.store.symbolicate_event.low_priority.projects.auto", + project_count, + ) + + # The manual kill switch is a list of configurations where each config item corresponds to one + # project affected by the switch. The general idea is to grab the raw option, validate its + # contents, and then assume that the length of the validated list corresponds to the number of + # projects in that switch. + + always_included_raw = options.get( + "store.symbolicate-event-lpq-always", + ) + always_included = len( + normalize_value("store.symbolicate-event-lpq-always", always_included_raw) + ) + metrics.gauge( + "tasks.store.symbolicate_event.low_priority.projects.manual.always", + always_included, + ) + + never_included_raw = options.get( + "store.symbolicate-event-lpq-never", + ) + never_included = len(normalize_value("store.symbolicate-event-lpq-never", never_included_raw)) + metrics.gauge( + "tasks.store.symbolicate_event.low_priority.projects.manual.never", + never_included, + ) diff --git a/tests/sentry/processing/realtime_metrics/test_redis.py b/tests/sentry/processing/realtime_metrics/test_redis.py index b5f0229f643f04..33bbc3a4d3fd6b 100644 --- a/tests/sentry/processing/realtime_metrics/test_redis.py +++ b/tests/sentry/processing/realtime_metrics/test_redis.py @@ -1,4 +1,3 @@ -import random from datetime import datetime from typing import Any @@ -76,37 +75,311 @@ def test_record_project_duration_different_buckets( assert redis_cluster.get("symbolicate_event_low_priority:budget:10:17:1150") == "1000" -def test_is_lpq_spike(store: RedisRealtimeMetricsStore) -> None: - assert not store.is_lpq_project(17) +# +# get_lpq_projects() +# - store.record_project_duration(17, 1000000.0) - assert store.is_lpq_project(17) +def test_get_lpq_projects_unset(store: RedisRealtimeMetricsStore) -> None: + in_lpq = store.get_lpq_projects() + assert in_lpq == set() -def test_is_lpq_gradual(store: RedisRealtimeMetricsStore) -> None: - with freeze_time(datetime.fromtimestamp(1147)) as frozen_datetime: - for _ in range(60): - delta = random.randint(1, 6) - used = 5.5 * delta - store.record_project_duration(17, used) - frozen_datetime.shift(delta) - assert store.is_lpq_project(17) +def test_get_lpq_projects_empty( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + redis_cluster.srem("store.symbolicate-event-lpq-selected", 1) + + in_lpq = store.get_lpq_projects() + assert in_lpq == set() -def test_not_lpq_spike(store: RedisRealtimeMetricsStore) -> None: - assert not store.is_lpq_project(17) - # just under the entire budget for 2min - used = 5.0 * 115 - store.record_project_duration(17, used) - assert not store.is_lpq_project(17) +def test_get_lpq_projects_filled( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + in_lpq = store.get_lpq_projects() + assert in_lpq == {1} -def test_not_lpq_gradual(store: RedisRealtimeMetricsStore) -> None: - with freeze_time(datetime.fromtimestamp(1147)) as frozen_datetime: - for _ in range(60): - delta = random.randint(1, 6) - used = 4.5 * delta - store.record_project_duration(17, used) - frozen_datetime.shift(delta) - assert not store.is_lpq_project(17) +# +# add_project_to_lpq() +# + + +def test_add_project_to_lpq_unset( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + added = store.add_project_to_lpq(1) + assert added + in_lpq = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert in_lpq == {"1"} + + +def test_add_project_to_lpq_empty( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + redis_cluster.srem("store.symbolicate-event-lpq-selected", 1) + + added = store.add_project_to_lpq(1) + assert added + in_lpq = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert in_lpq == {"1"} + + +def test_add_project_to_lpq_dupe( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + + added = store.add_project_to_lpq(1) + assert not added + in_lpq = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert in_lpq == {"1"} + + +def test_add_project_to_lpq_filled( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 11) + + added = store.add_project_to_lpq(1) + assert added + in_lpq = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert in_lpq == {"1", "11"} + + +def test_add_project_to_lpq_backing_off_adding( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.set(f"{store._backoff_key_prefix()}:1", 1) + + added = store.add_project_to_lpq(1) + assert not added + + +def test_add_project_to_lpq_backing_off_readding( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + store.add_project_to_lpq(1) + in_lpq = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert in_lpq == {"1"} + assert redis_cluster.get(f"{store._backoff_key_prefix()}:1") == "1" + + added = store.add_project_to_lpq(1) + assert not added + + +# +# remove_projects_from_lpq() +# + + +def test_remove_projects_from_lpq_unset( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + removed = store.remove_projects_from_lpq({1}) + assert removed == 0 + + remaining = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert remaining == set() + + +def test_remove_projects_from_lpq_empty( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + redis_cluster.srem("store.symbolicate-event-lpq-selected", 1) + + removed = store.remove_projects_from_lpq({1}) + assert removed == 0 + remaining = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert remaining == set() + + +def test_remove_projects_from_lpq_only_member( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + + removed = store.remove_projects_from_lpq({1}) + assert removed == 1 + + remaining = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert remaining == set() + + +def test_remove_projects_from_lpq_nonmember( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 11) + + removed = store.remove_projects_from_lpq({1}) + assert removed == 0 + + remaining = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert remaining == {"11"} + + +def test_remove_projects_from_lpq_subset( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 11) + + removed = store.remove_projects_from_lpq({1}) + assert removed == 1 + + remaining = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert remaining == {"11"} + + +def test_remove_projects_from_lpq_all_members( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 11) + + removed = store.remove_projects_from_lpq({1, 11}) + assert removed == 2 + + remaining = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert remaining == set() + + +def test_remove_projects_from_lpq_no_members( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.sadd("store.symbolicate-event-lpq-selected", 1) + + removed = store.remove_projects_from_lpq(set()) + assert removed == 0 + + remaining = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert remaining == {"1"} + + +def test_remove_projects_from_lpq_backing_off_removing( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + store.add_project_to_lpq(1) + in_lpq = redis_cluster.smembers("store.symbolicate-event-lpq-selected") + assert in_lpq == {"1"} + assert redis_cluster.get(f"{store._backoff_key_prefix()}:1") == "1" + + removed = store.remove_projects_from_lpq({1}) + assert not removed + + +def test_remove_projects_from_lpq_backing_off_reremoving( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.set(f"{store._backoff_key_prefix()}:1", 1) + + removed = store.remove_projects_from_lpq({1}) + assert not removed + + +# +# projects() +# + + +def test_projects_unset(store: RedisRealtimeMetricsStore) -> None: + candidates = store.projects() + assert list(candidates) == [] + + +def test_projects_empty(store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis) -> None: + redis_cluster.set( + "symbolicate_event_low_priority:budget:10:42:111", + 0, + ) + redis_cluster.delete("symbolicate_event_low_priority:budget:10:42:111") + + candidates = store.projects() + assert list(candidates) == [] + + +def test_projects_different_bucket( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.set("symbolicate_event_low_priority:budget:5:42:111", 0) + + candidates = store.projects() + assert list(candidates) == [] + + +def test_projects_negative_timestamp( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.set("symbolicate_event_low_priority:budget:10:42:-111", 0) + + candidates = store.projects() + assert list(candidates) == [42] + + +def test_projects_one_budget(store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis) -> None: + redis_cluster.set("symbolicate_event_low_priority:budget:10:42:111", 0) + + candidates = store.projects() + assert list(candidates) == [42] + + +def test_projects_mixed_buckets( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.set("symbolicate_event_low_priority:budget:10:42:111", 0) + redis_cluster.set("symbolicate_event_low_priority:budget:5:53:111", 0) + + candidates = store.projects() + assert list(candidates) == [42] + + +# +# get_used_budget_for_project() +# + + +def test_get_used_budget_for_project_unset(store: RedisRealtimeMetricsStore) -> None: + budget = store.get_used_budget_for_project(project_id=42) + + assert budget == 0 + + +def test_get_used_budget_for_project_missing_project( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.set("symbolicate_event_low_priority:budget:10:53:111", 0) + + with freeze_time(datetime.fromtimestamp(113)): + budget = store.get_used_budget_for_project(project_id=42) + + assert budget == 0 + + +def test_get_used_budget_for_project_different_bucket_sizes( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + redis_cluster.set("symbolicate_event_low_priority:budget:10:42:110", 1000 * 120) + redis_cluster.set("symbolicate_event_low_priority:budget:5:42:110", 2000) + + with freeze_time(datetime.fromtimestamp(113)): + budget = store.get_used_budget_for_project(project_id=42) + + assert round(budget) == 1 + + +def test_get_used_budget_for_projects_with_gap( + store: RedisRealtimeMetricsStore, redis_cluster: StrictRedis +) -> None: + store._budget_time_window = 40 + redis_cluster.set("symbolicate_event_low_priority:budget:10:42:110", 3000 * 40) + redis_cluster.set("symbolicate_event_low_priority:budget:10:42:150", 17000 * 40) + + with freeze_time(datetime.fromtimestamp(151)): + budget = store.get_used_budget_for_project(project_id=42) + + assert round(budget) == 20 diff --git a/tests/sentry/tasks/test_low_priority_symbolication.py b/tests/sentry/tasks/test_low_priority_symbolication.py new file mode 100644 index 00000000000000..40e0015622759e --- /dev/null +++ b/tests/sentry/tasks/test_low_priority_symbolication.py @@ -0,0 +1,152 @@ +from __future__ import annotations + +from collections.abc import Generator +from datetime import datetime +from typing import TYPE_CHECKING +from unittest import mock + +import pytest + +from sentry.processing import realtime_metrics +from sentry.processing.realtime_metrics.base import RealtimeMetricsStore +from sentry.processing.realtime_metrics.redis import RedisRealtimeMetricsStore +from sentry.tasks import low_priority_symbolication +from sentry.tasks.low_priority_symbolication import ( + _scan_for_suspect_projects, + _update_lpq_eligibility, +) +from sentry.testutils.helpers.datetime import freeze_time +from sentry.testutils.helpers.task_runner import TaskRunner +from sentry.utils.services import LazyServiceWrapper + +if TYPE_CHECKING: # TODO: pytest 7.x + from _pytest.monkeypatch import MonkeyPatch + + +@pytest.fixture +def store() -> Generator[RealtimeMetricsStore, None, None]: + store = LazyServiceWrapper( + RealtimeMetricsStore, + "sentry.processing.realtime_metrics.redis.RedisRealtimeMetricsStore", + { + "cluster": "default", + "budget_bucket_size": 10, + "budget_time_window": 120, + "backoff_timer": 0, + }, + ) + + old_properties = realtime_metrics.__dict__.copy() + store.expose(realtime_metrics.__dict__) + yield store + + # cleanup + realtime_metrics.__dict__.update(old_properties) + + +class TestScanForSuspectProjects: + @pytest.fixture + def mock_update_lpq_eligibility( + self, monkeypatch: MonkeyPatch + ) -> Generator[mock.Mock, None, None]: + mock_fn = mock.Mock() + monkeypatch.setattr(low_priority_symbolication, "update_lpq_eligibility", mock_fn) + yield mock_fn + + def test_no_metrics_not_in_lpq( + self, store: RealtimeMetricsStore, mock_update_lpq_eligibility: mock.Mock + ) -> None: + assert store.get_lpq_projects() == set() + + with TaskRunner(): + _scan_for_suspect_projects() + + assert store.get_lpq_projects() == set() + assert not mock_update_lpq_eligibility.delay.called + + def test_no_metrics_in_lpq( + self, store: RealtimeMetricsStore, mock_update_lpq_eligibility: mock.Mock + ) -> None: + store.add_project_to_lpq(17) + assert store.get_lpq_projects() == {17} + assert store.is_lpq_project(17) + + with TaskRunner(): + _scan_for_suspect_projects() + + assert store.get_lpq_projects() == set() + assert not mock_update_lpq_eligibility.delay.called + + @freeze_time(datetime.fromtimestamp(1147)) + def test_has_metric( + self, store: RealtimeMetricsStore, mock_update_lpq_eligibility: mock.Mock + ) -> None: + store.record_project_duration(17, 1.0) + + with TaskRunner(): + _scan_for_suspect_projects() + + assert mock_update_lpq_eligibility.delay.called + + +class TestUpdateLpqEligibility: + def test_no_metrics_in_lpq(self, store: RealtimeMetricsStore) -> None: + store.add_project_to_lpq(17) + assert store.get_lpq_projects() == {17} + + _update_lpq_eligibility(project_id=17) + assert store.get_lpq_projects() == set() + + def test_no_metrics_not_in_lpq(self, store: RealtimeMetricsStore) -> None: + _update_lpq_eligibility(project_id=17) + assert store.get_lpq_projects() == set() + + @freeze_time(datetime.fromtimestamp(1147)) + def test_is_eligible_not_lpq(self, store: RealtimeMetricsStore) -> None: + assert store.get_lpq_projects() == set() + + store.record_project_duration(17, 1000000.0) + + _update_lpq_eligibility(project_id=17) + assert store.get_lpq_projects() == {17} + assert store.is_lpq_project(17) + + @freeze_time(datetime.fromtimestamp(0)) + def test_is_eligible_in_lpq(self, store: RealtimeMetricsStore) -> None: + store.add_project_to_lpq(17) + + store.record_project_duration(17, 1000000.0) + + _update_lpq_eligibility(project_id=17) + assert store.get_lpq_projects() == {17} + assert store.is_lpq_project(17) + + def test_not_eligible_in_lpq(self, store: RealtimeMetricsStore) -> None: + store.add_project_to_lpq(17) + + _update_lpq_eligibility(project_id=17) + assert store.get_lpq_projects() == set() + + def test_not_eligible_not_lpq(self, store: RealtimeMetricsStore) -> None: + _update_lpq_eligibility(project_id=17) + assert store.get_lpq_projects() == set() + + @freeze_time(datetime.fromtimestamp(0)) + def test_is_eligible_recently_moved(self, store: RedisRealtimeMetricsStore) -> None: + store._backoff_timer = 10 + # Abusing the fact that removing always updates the backoff timer even if it's a noop + store.remove_projects_from_lpq({17}) + + store.record_project_duration(17, 1000000.0) + + _update_lpq_eligibility(17) + assert store.get_lpq_projects() == set() + + def test_not_eligible_recently_moved(self, store: RedisRealtimeMetricsStore) -> None: + store._backoff_timer = 10 + store.add_project_to_lpq(17) + + _update_lpq_eligibility(17) + assert store.get_lpq_projects() == {17} + assert not store.is_lpq_project(16) + assert store.is_lpq_project(17)