diff --git a/pyproject.toml b/pyproject.toml index 7a2f440d4e2683..95e441951d10ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -223,6 +223,7 @@ module = [ "sentry.middleware.auth", "sentry.middleware.ratelimit", "sentry.middleware.superuser", + "sentry.monitors.consumers.monitor_consumer", "sentry.monitors.endpoints.base", "sentry.monitors.endpoints.organization_monitor_index", "sentry.net.http", @@ -448,7 +449,6 @@ module = [ "sentry.models.groupinbox", "sentry.models.groupsubscription", "sentry.models.options.*", - "sentry.monitors.consumers.monitor_consumer", "sentry.monkey.*", "sentry.nodestore.*", "sentry.nodestore.base", diff --git a/src/sentry/monitors/consumers/monitor_consumer.py b/src/sentry/monitors/consumers/monitor_consumer.py index 67e5aa2691ce5c..0c2a806298bd1e 100644 --- a/src/sentry/monitors/consumers/monitor_consumer.py +++ b/src/sentry/monitors/consumers/monitor_consumer.py @@ -8,7 +8,7 @@ from copy import deepcopy from datetime import datetime, timedelta from functools import partial -from typing import Any, Literal, NotRequired, TypedDict +from typing import Literal import sentry_sdk from arroyo.backends.kafka.consumer import KafkaPayload @@ -90,8 +90,8 @@ def _ensure_monitor_with_config( project: Project, monitor_slug: str, - config: dict[str, Any] | None, -) -> Monitor | None: + config: Mapping | None, +): try: monitor = Monitor.objects.get( slug=monitor_slug, @@ -176,9 +176,9 @@ def _ensure_monitor_with_config( def check_killswitch( - metric_kwargs: dict[str, str], + metric_kwargs: Mapping, project: Project, -) -> bool: +): """ Enforce organization level monitor kill switch. Returns true if the killswitch is enforced. @@ -194,7 +194,7 @@ def check_killswitch( return is_blocked -def check_ratelimit(metric_kwargs: dict[str, str], item: CheckinItem) -> bool: +def check_ratelimit(metric_kwargs: Mapping, item: CheckinItem): """ Enforce check-in rate limits. Returns True if rate limit is enforced. """ @@ -218,19 +218,12 @@ def check_ratelimit(metric_kwargs: dict[str, str], item: CheckinItem) -> bool: return is_blocked -class _CheckinUpdateKwargs(TypedDict): - status: NotRequired[CheckInStatus] - duration: int | None - timeout_at: NotRequired[datetime | None] - date_updated: NotRequired[datetime] - - def transform_checkin_uuid( txn: Transaction | Span, - metric_kwargs: dict[str, str], + metric_kwargs: Mapping, monitor_slug: str, check_in_id: str, -) -> tuple[uuid.UUID, bool] | tuple[None, Literal[False]]: +): """ Extracts the `UUID` object from the provided check_in_id. Failures will be logged. Returns the UUID object and a boolean indicating if the provided GUID @@ -272,14 +265,14 @@ def transform_checkin_uuid( def update_existing_check_in( txn: Transaction | Span, - metric_kwargs: dict[str, str], + metric_kwargs: Mapping, project_id: int, monitor_environment: MonitorEnvironment, start_time: datetime, existing_check_in: MonitorCheckIn, updated_status: CheckInStatus, - updated_duration: int | None, -) -> None: + updated_duration: float, +): monitor = monitor_environment.monitor processing_errors: list[ProcessingError] = [] @@ -380,7 +373,7 @@ def update_existing_check_in( if processing_errors: raise ProcessingErrorsException(processing_errors, monitor=monitor) - updated_checkin: _CheckinUpdateKwargs = { + updated_checkin = { "status": updated_status, "duration": updated_duration, } @@ -414,7 +407,7 @@ def update_existing_check_in( existing_check_in.update(**updated_checkin) -def _process_checkin(item: CheckinItem, txn: Transaction | Span) -> None: +def _process_checkin(item: CheckinItem, txn: Transaction | Span): params = item.payload start_time = to_datetime(float(item.message["start_time"])) @@ -508,10 +501,13 @@ def _process_checkin(item: CheckinItem, txn: Transaction | Span) -> None: monitor_config = params.pop("monitor_config", None) - if params.get("duration") is not None: + params["duration"] = ( # Duration is specified in seconds from the client, it is # stored in the checkin model as milliseconds - params["duration"] = int(params["duration"] * 1000) + int(params["duration"] * 1000) + if params.get("duration") is not None + else None + ) validator = MonitorCheckInValidator( data=params, @@ -784,7 +780,6 @@ def _process_checkin(item: CheckinItem, txn: Transaction | Span) -> None: category=DataCategory.MONITOR, ) - assert check_in.monitor_environment is not None existing_env = check_in.monitor_environment.get_environment().name env_mismatch_error: CheckinEnvironmentMismatch = { @@ -919,7 +914,7 @@ def _process_checkin(item: CheckinItem, txn: Transaction | Span) -> None: logger.exception("Failed to process check-in") -def process_checkin(item: CheckinItem) -> None: +def process_checkin(item: CheckinItem): """ Process an individual check-in """ @@ -937,7 +932,7 @@ def process_checkin(item: CheckinItem) -> None: logger.exception("Failed to process check-in") -def process_checkin_group(items: list[CheckinItem]) -> None: +def process_checkin_group(items: list[CheckinItem]): """ Process a group of related check-ins (all part of the same monitor) completely serially. @@ -946,9 +941,7 @@ def process_checkin_group(items: list[CheckinItem]) -> None: process_checkin(item) -def process_batch( - executor: ThreadPoolExecutor, message: Message[ValuesBatch[KafkaPayload]] -) -> None: +def process_batch(executor: ThreadPoolExecutor, message: Message[ValuesBatch[KafkaPayload]]): """ Receives batches of check-in messages. This function will take the batch and group them together by monitor ID (ensuring order is preserved) and @@ -959,8 +952,8 @@ def process_batch( """ batch = message.payload - latest_partition_ts: dict[int, datetime] = {} - checkin_mapping: dict[str, list[CheckinItem]] = defaultdict(list) + latest_partition_ts: Mapping[int, datetime] = {} + checkin_mapping: Mapping[str, list[CheckinItem]] = defaultdict(list) for item in batch: assert isinstance(item, BrokerValue) @@ -979,13 +972,13 @@ def process_batch( if wrapper["message_type"] == "clock_pulse": continue - checkin_item = CheckinItem( + item = CheckinItem( ts=item.timestamp, partition=item.partition.index, message=wrapper, payload=json.loads(wrapper["payload"]), ) - checkin_mapping[checkin_item.processing_key].append(checkin_item) + checkin_mapping[item.processing_key].append(item) # Number of check-ins that are being processed in this batch metrics.gauge("monitors.checkin.parallel_batch_count", len(batch)) @@ -1001,7 +994,7 @@ def process_batch( wait(futures) # Update check in volume for the entire batch we've just processed - update_check_in_volume(item.timestamp for item in batch if item.timestamp is not None) + update_check_in_volume(item.timestamp for item in batch) # Attempt to trigger monitor tasks across processed partitions for partition, ts in latest_partition_ts.items(): @@ -1011,7 +1004,7 @@ def process_batch( logger.exception("Failed to trigger monitor tasks") -def process_single(message: Message[KafkaPayload | FilteredPayload]) -> None: +def process_single(message: Message[KafkaPayload | FilteredPayload]): assert not isinstance(message.payload, FilteredPayload) assert isinstance(message.value, BrokerValue) diff --git a/src/sentry/monitors/system_incidents.py b/src/sentry/monitors/system_incidents.py index 21803cb3974ce2..4b7531802b90e7 100644 --- a/src/sentry/monitors/system_incidents.py +++ b/src/sentry/monitors/system_incidents.py @@ -11,7 +11,7 @@ import logging import statistics from collections import Counter -from collections.abc import Generator, Iterable +from collections.abc import Generator, Sequence from dataclasses import dataclass from datetime import UTC, datetime, timedelta from enum import StrEnum @@ -64,7 +64,7 @@ BACKFILL_CHUNKS = 10 -def update_check_in_volume(ts_iter: Iterable[datetime]) -> None: +def update_check_in_volume(ts_list: Sequence[datetime]): """ Increment counters for a list of check-in timestamps. Each timestamp will be trimmed to the minute and grouped appropriately @@ -72,7 +72,7 @@ def update_check_in_volume(ts_iter: Iterable[datetime]) -> None: redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER) # Group timestamps down to the minute - for reference_ts, count in Counter(_make_reference_ts(ts) for ts in ts_iter).items(): + for reference_ts, count in Counter(_make_reference_ts(ts) for ts in ts_list).items(): key = MONITOR_VOLUME_HISTORY.format(ts=reference_ts) pipeline = redis_client.pipeline() @@ -665,7 +665,7 @@ def _backfill_decisions( return None -def _make_reference_ts(ts: datetime) -> int: +def _make_reference_ts(ts: datetime): """ Produce a timestamp number with the seconds and microsecond removed """ diff --git a/src/sentry/monitors/utils.py b/src/sentry/monitors/utils.py index 1b6719d2977afe..f8a87696323658 100644 --- a/src/sentry/monitors/utils.py +++ b/src/sentry/monitors/utils.py @@ -73,7 +73,7 @@ def get_max_runtime(max_runtime: int | None) -> timedelta: # Generates a timeout_at value for new check-ins def get_timeout_at( - monitor_config: dict | None, status: CheckInStatus, date_added: datetime | None + monitor_config: dict, status: CheckInStatus, date_added: datetime | None ) -> datetime | None: if status == CheckInStatus.IN_PROGRESS and date_added is not None: return date_added.replace(second=0, microsecond=0) + get_max_runtime(