Skip to content

Commit

Permalink
Revert "ref: Remove LPQ scanning (#71908)"
Browse files Browse the repository at this point in the history
This reverts commit aa4745a.

Co-authored-by: asottile-sentry <[email protected]>
  • Loading branch information
getsentry-bot and asottile-sentry committed Jun 4, 2024
1 parent 5e2e817 commit afe88ab
Show file tree
Hide file tree
Showing 11 changed files with 827 additions and 56 deletions.
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,10 @@ static/app/components/events/eventStatisticalDetector/ @getse
/src/sentry/processing/realtime_metrics/ @getsentry/owners-native
/src/sentry/tasks/app_store_connect.py @getsentry/owners-native
/src/sentry/tasks/assemble.py @getsentry/owners-native
/src/sentry/tasks/low_priority_symbolication.py @getsentry/owners-native
/src/sentry/tasks/symbolication.py @getsentry/owners-native
/src/sentry/utils/appleconnect/ @getsentry/owners-native
/tests/sentry/tasks/test_low_priority_symbolication.py @getsentry/owners-native
/src/sentry/tasks/reprocessing.py @getsentry/processing
/src/sentry/tasks/reprocessing2.py @getsentry/processing
/src/sentry/reprocessing2.py @getsentry/processing
Expand Down
7 changes: 5 additions & 2 deletions src/sentry/api/serializers/models/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from sentry.processing import realtime_metrics
from sentry.roles import organization_roles
from sentry.snuba import discover
from sentry.tasks.symbolication import should_demote_symbolication

STATUS_LABELS = {
ObjectStatus.ACTIVE: "active",
Expand Down Expand Up @@ -678,14 +679,16 @@ def get_attrs(
if not self._collapse(LATEST_DEPLOYS_KEY):
deploys_by_project = self.get_deploys_by_project(item_list)

with sentry_sdk.start_span(op="project_summary_serializer.get_lpq_projects"):
lpq_projects = realtime_metrics.get_lpq_projects()
for item in item_list:
attrs[item]["latest_release"] = latest_release_versions.get(item.id)
attrs[item]["environments"] = environments_by_project.get(item.id, [])
attrs[item]["has_user_reports"] = item.id in projects_with_user_reports
if not self._collapse(LATEST_DEPLOYS_KEY):
attrs[item]["deploys"] = deploys_by_project.get(item.id)
attrs[item]["symbolication_degraded"] = realtime_metrics.is_lpq_project(
project_id=item.id
attrs[item]["symbolication_degraded"] = should_demote_symbolication(
project_id=item.id, lpq_projects=lpq_projects, emit_metrics=False
)

return attrs
Expand Down
1 change: 1 addition & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"sentry.tasks.files",
"sentry.tasks.groupowner",
"sentry.tasks.integrations",
"sentry.tasks.low_priority_symbolication",
"sentry.tasks.merge",
"sentry.tasks.options",
"sentry.tasks.ping",
Expand Down
5 changes: 5 additions & 0 deletions src/sentry/processing/realtime_metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@
__realtime_metrics_store__ = RealtimeMetricsStore()
validate = __realtime_metrics_store__.validate
record_project_duration = __realtime_metrics_store__.record_project_duration
projects = __realtime_metrics_store__.projects
get_used_budget_for_project = __realtime_metrics_store__.get_used_budget_for_project
get_lpq_projects = __realtime_metrics_store__.get_lpq_projects
is_lpq_project = __realtime_metrics_store__.is_lpq_project
add_project_to_lpq = __realtime_metrics_store__.add_project_to_lpq
remove_projects_from_lpq = __realtime_metrics_store__.remove_projects_from_lpq
59 changes: 59 additions & 0 deletions src/sentry/processing/realtime_metrics/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from collections.abc import Iterable

from sentry.utils.services import Service


Expand All @@ -7,7 +9,12 @@ class RealtimeMetricsStore(Service):
__all__ = (
"validate",
"record_project_duration",
"projects",
"get_used_budget_for_project",
"get_lpq_projects",
"is_lpq_project",
"add_project_to_lpq",
"remove_projects_from_lpq",
)

def validate(self) -> None:
Expand Down Expand Up @@ -46,8 +53,60 @@ def record_project_duration(self, project_id: int, duration: float) -> None:
"""
raise NotImplementedError

def projects(self) -> Iterable[int]:
"""
Returns IDs of all projects that should be considered for the low priority queue.
"""
raise NotImplementedError

def get_used_budget_for_project(self, project_id: int) -> float:
"""
Returns the average used per-second budget for a given project during the configured sliding time window.
"""
raise NotImplementedError

def get_lpq_projects(self) -> set[int]:
"""
Fetches the list of projects that are currently using the low priority queue.
Returns a list of project IDs.
"""
raise NotImplementedError

def is_lpq_project(self, project_id: int) -> bool:
"""
Checks whether the given project is currently using the low priority queue.
"""
raise NotImplementedError

def add_project_to_lpq(self, project_id: int) -> bool:
"""
Assigns a project to the low priority queue.
This registers an intent to redirect all symbolication events triggered by the specified
project to be redirected to the low priority queue.
Applies a backoff timer to the project which prevents it from being automatically evicted
from the queue while that timer is active.
Returns True if the project was a new addition to the list. Returns False if it was already
assigned to the low priority queue. This may throw an exception if there is some sort of
issue registering the project with the queue.
"""
raise NotImplementedError

def remove_projects_from_lpq(self, project_ids: set[int]) -> int:
"""
Unassigns projects from the low priority queue.
This registers an intent to restore all specified projects back to the regular queue.
Applies a backoff timer to the project which prevents it from being automatically assigned
to the queue while that timer is active.
Returns the number of projects that were actively removed from the queue. Any projects that
were not assigned to the low priority queue to begin with will be omitted from the return
value. This may throw an exception if there is some sort of issue deregistering the projects
from the queue.
"""
raise NotImplementedError
16 changes: 16 additions & 0 deletions src/sentry/processing/realtime_metrics/dummy.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from collections.abc import Iterable
from typing import Any

from . import base
Expand All @@ -16,5 +17,20 @@ def validate(self) -> None:
def record_project_duration(self, project_id: int, duration: float) -> None:
pass

def projects(self) -> Iterable[int]:
yield from ()

def get_used_budget_for_project(self, project_id: int) -> float:
return 0.0

def get_lpq_projects(self) -> set[int]:
return set()

def is_lpq_project(self, project_id: int) -> bool:
return False

def add_project_to_lpq(self, project_id: int) -> bool:
return False

def remove_projects_from_lpq(self, project_ids: set[int]) -> int:
return 0
16 changes: 16 additions & 0 deletions src/sentry/processing/realtime_metrics/pb.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from collections.abc import Iterable
from urllib.parse import urljoin

from requests import RequestException
Expand Down Expand Up @@ -57,3 +58,18 @@ def is_lpq_project(self, project_id: int) -> bool:

def validate(self) -> None:
pass

def projects(self) -> Iterable[int]:
yield from ()

def get_used_budget_for_project(self, project_id: int) -> float:
return 0.0

def get_lpq_projects(self) -> set[int]:
return set()

def add_project_to_lpq(self, project_id: int) -> bool:
return False

def remove_projects_from_lpq(self, project_ids: set[int]) -> int:
return 0
145 changes: 118 additions & 27 deletions src/sentry/processing/realtime_metrics/redis.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import logging
from collections.abc import Iterable, Sequence
from time import time

from django.conf import settings

from sentry.exceptions import InvalidConfiguration
from sentry.utils import redis

from . import base

logger = logging.getLogger(__name__)
# redis key for entry storing current list of LPQ members
LPQ_MEMBERS_KEY = "store.symbolicate-event-lpq-selected"

# Redis key prefix for storing the LPQ status of projects
MEMBER_KEY_PREFIX = "symbolicate_event_low_priority:members"
# Redis key prefix for storing the budgets of projects
BUDGET_KEY_PREFIX = "symbolicate_event_low_priority:budget"
logger = logging.getLogger(__name__)


class RedisRealtimeMetricsStore(base.RealtimeMetricsStore):
Expand All @@ -37,6 +34,7 @@ def __init__(
self.cluster = redis.redis_clusters.get(cluster)
self._budget_bucket_size = budget_bucket_size
self._budget_time_window = budget_time_window
self._prefix = "symbolicate_event_low_priority"
self._backoff_timer = backoff_timer

self.validate()
Expand All @@ -48,11 +46,31 @@ def validate(self) -> None:
if self._budget_time_window < 60:
raise InvalidConfiguration("budget time window must be at least a minute")

if self._backoff_timer < 1:
raise InvalidConfiguration("backoff timer must be at least a second")

def _budget_key_prefix(self) -> str:
return f"{BUDGET_KEY_PREFIX}:{self._budget_bucket_size}"
return f"{self._prefix}:budget:{self._budget_bucket_size}"

def _backoff_key_prefix(self) -> str:
return f"{self._prefix}:backoff"

def _register_backoffs(self, project_ids: Sequence[int]) -> None:
if len(project_ids) == 0 or self._backoff_timer == 0:
return

with self.cluster.pipeline(transaction=False) as pipeline:
for project_id in project_ids:
key = f"{self._backoff_key_prefix()}:{project_id}"
# Can't use mset because it doesn't allow also specifying an expiry
pipeline.set(name=key, value="1", ex=self._backoff_timer)

pipeline.execute()

def _is_backing_off(self, project_id: int) -> bool:
"""
Returns whether a project is currently in the middle of its backoff timer from having
recently been assigned to or unassigned from the LPQ.
"""
key = f"{self._backoff_key_prefix()}:{project_id}"
return self.cluster.get(key) is not None

def record_project_duration(self, project_id: int, duration: float) -> None:
"""
Expand Down Expand Up @@ -96,9 +114,31 @@ def record_project_duration(self, project_id: int, duration: float) -> None:
pipeline.expire(key, self._budget_time_window + self._budget_bucket_size)
pipeline.execute()

def is_lpq_project(self, project_id: int) -> bool:
def projects(self) -> Iterable[int]:
"""
Checks whether the given project is currently using the low priority queue.
Returns IDs of all projects for which metrics have been recorded in the store.
This may throw an exception if there is some sort of issue scanning the redis store for
projects.
"""

already_seen = set()
all_keys = self.cluster.scan_iter(
match=self._budget_key_prefix() + ":*",
)

for item in all_keys:
# Because this could be one of two patterns, this splits based on the most basic
# delimiter ":" instead of splitting on known prefixes
_prefix, _metric_type, _bucket_size, project_id_raw, _else = item.split(":", maxsplit=4)
project_id = int(project_id_raw)
if project_id not in already_seen:
already_seen.add(project_id)
yield project_id

def get_used_budget_for_project(self, project_id: int) -> float:
"""
Returns the average used per-second budget for a given project during the configured sliding time window.
"""
timestamp = int(time())

Expand All @@ -110,23 +150,74 @@ def is_lpq_project(self, project_id: int) -> bool:

buckets = range(first_bucket, now_bucket + bucket_size, bucket_size)
keys = [f"{self._budget_key_prefix()}:{project_id}:{ts}" for ts in buckets]
member_key = f"{MEMBER_KEY_PREFIX}:{project_id}"
keys.insert(0, member_key)
results = self.cluster.mget(keys)
is_lpq = results[0]
counts = results[1:]

if is_lpq is not None:
return True
counts = self.cluster.mget(keys)

total_time_window = timestamp - first_bucket
total_sum = sum(int(c) if c else 0 for c in counts)

# the counts in redis are in ms resolution.
average_used = total_sum / total_time_window / 1000
budget = settings.SENTRY_LPQ_OPTIONS["project_budget"]
new_is_lpq = average_used > budget
return total_sum / total_time_window / 1000

def get_lpq_projects(self) -> set[int]:
"""
Fetches the list of projects that are currently using the low priority queue.
Returns a list of project IDs.
This may throw an exception if there is some sort of issue fetching the list from the redis
store.
"""
return {int(project_id) for project_id in self.cluster.smembers(LPQ_MEMBERS_KEY)}

def is_lpq_project(self, project_id: int) -> bool:
"""
Checks whether the given project is currently using the low priority queue.
"""
return bool(self.cluster.sismember(LPQ_MEMBERS_KEY, project_id))

def add_project_to_lpq(self, project_id: int) -> bool:
"""
Assigns a project to the low priority queue.
This registers an intent to redirect all symbolication events triggered by the specified
project to be redirected to the low priority queue.
Applies a backoff timer to the project which prevents it from being automatically evicted
from the queue while that timer is active.
Returns True if the project was a new addition to the list. Returns False if it was already
assigned to the low priority queue. This may throw an exception if there is some sort of
issue registering the project with the queue.
"""

if self._is_backing_off(project_id):
return False
# If this successfully completes then the project is expected to be in the set.
was_added = int(self.cluster.sadd(LPQ_MEMBERS_KEY, project_id)) > 0
self._register_backoffs([project_id])
return was_added

def remove_projects_from_lpq(self, project_ids: set[int]) -> int:
"""
Unassigns projects from the low priority queue.
This registers an intent to restore all specified projects back to the regular queue.
Applies a backoff timer to the project which prevents it from being automatically assigned
to the queue while that timer is active.
Returns the number of projects that were actively removed from the queue. Any projects that
were not assigned to the low priority queue to begin with will be omitted from the return
value. This may throw an exception if there is some sort of issue deregistering the projects
from the queue.
"""
removable = [project for project in project_ids if not self._is_backing_off(project)]

if not removable:
return 0

if new_is_lpq:
self.cluster.set(name=member_key, value="1", ex=self._backoff_timer)
return new_is_lpq
# This returns the number of projects removed, and throws an exception if there's a problem.
# If this successfully completes then the projects are expected to no longer be in the set.
removed = int(self.cluster.srem(LPQ_MEMBERS_KEY, *removable))
self._register_backoffs(removable)
return removed
Loading

0 comments on commit afe88ab

Please sign in to comment.