Skip to content

Commit

Permalink
Revert "ref: fix types for monitor_consumer (#83831)"
Browse files Browse the repository at this point in the history
This reverts commit 6b69f76.

Co-authored-by: IanWoodard <[email protected]>
  • Loading branch information
getsentry-bot and IanWoodard committed Jan 22, 2025
1 parent 2add871 commit 6ff0bb7
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 40 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ module = [
"sentry.middleware.auth",
"sentry.middleware.ratelimit",
"sentry.middleware.superuser",
"sentry.monitors.consumers.monitor_consumer",
"sentry.monitors.endpoints.base",
"sentry.monitors.endpoints.organization_monitor_index",
"sentry.net.http",
Expand Down Expand Up @@ -448,7 +449,6 @@ module = [
"sentry.models.groupinbox",
"sentry.models.groupsubscription",
"sentry.models.options.*",
"sentry.monitors.consumers.monitor_consumer",
"sentry.monkey.*",
"sentry.nodestore.*",
"sentry.nodestore.base",
Expand Down
61 changes: 27 additions & 34 deletions src/sentry/monitors/consumers/monitor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from copy import deepcopy
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Literal, NotRequired, TypedDict
from typing import Literal

import sentry_sdk
from arroyo.backends.kafka.consumer import KafkaPayload
Expand Down Expand Up @@ -90,8 +90,8 @@
def _ensure_monitor_with_config(
project: Project,
monitor_slug: str,
config: dict[str, Any] | None,
) -> Monitor | None:
config: Mapping | None,
):
try:
monitor = Monitor.objects.get(
slug=monitor_slug,
Expand Down Expand Up @@ -176,9 +176,9 @@ def _ensure_monitor_with_config(


def check_killswitch(
metric_kwargs: dict[str, str],
metric_kwargs: Mapping,
project: Project,
) -> bool:
):
"""
Enforce organization level monitor kill switch. Returns true if the
killswitch is enforced.
Expand All @@ -194,7 +194,7 @@ def check_killswitch(
return is_blocked


def check_ratelimit(metric_kwargs: dict[str, str], item: CheckinItem) -> bool:
def check_ratelimit(metric_kwargs: Mapping, item: CheckinItem):
"""
Enforce check-in rate limits. Returns True if rate limit is enforced.
"""
Expand All @@ -218,19 +218,12 @@ def check_ratelimit(metric_kwargs: dict[str, str], item: CheckinItem) -> bool:
return is_blocked


class _CheckinUpdateKwargs(TypedDict):
status: NotRequired[CheckInStatus]
duration: int | None
timeout_at: NotRequired[datetime | None]
date_updated: NotRequired[datetime]


def transform_checkin_uuid(
txn: Transaction | Span,
metric_kwargs: dict[str, str],
metric_kwargs: Mapping,
monitor_slug: str,
check_in_id: str,
) -> tuple[uuid.UUID, bool] | tuple[None, Literal[False]]:
):
"""
Extracts the `UUID` object from the provided check_in_id. Failures will be logged.
Returns the UUID object and a boolean indicating if the provided GUID
Expand Down Expand Up @@ -272,14 +265,14 @@ def transform_checkin_uuid(

def update_existing_check_in(
txn: Transaction | Span,
metric_kwargs: dict[str, str],
metric_kwargs: Mapping,
project_id: int,
monitor_environment: MonitorEnvironment,
start_time: datetime,
existing_check_in: MonitorCheckIn,
updated_status: CheckInStatus,
updated_duration: int | None,
) -> None:
updated_duration: float,
):
monitor = monitor_environment.monitor
processing_errors: list[ProcessingError] = []

Expand Down Expand Up @@ -380,7 +373,7 @@ def update_existing_check_in(
if processing_errors:
raise ProcessingErrorsException(processing_errors, monitor=monitor)

updated_checkin: _CheckinUpdateKwargs = {
updated_checkin = {
"status": updated_status,
"duration": updated_duration,
}
Expand Down Expand Up @@ -414,7 +407,7 @@ def update_existing_check_in(
existing_check_in.update(**updated_checkin)


def _process_checkin(item: CheckinItem, txn: Transaction | Span) -> None:
def _process_checkin(item: CheckinItem, txn: Transaction | Span):
params = item.payload

start_time = to_datetime(float(item.message["start_time"]))
Expand Down Expand Up @@ -508,10 +501,13 @@ def _process_checkin(item: CheckinItem, txn: Transaction | Span) -> None:

monitor_config = params.pop("monitor_config", None)

if params.get("duration") is not None:
params["duration"] = (
# Duration is specified in seconds from the client, it is
# stored in the checkin model as milliseconds
params["duration"] = int(params["duration"] * 1000)
int(params["duration"] * 1000)
if params.get("duration") is not None
else None
)

validator = MonitorCheckInValidator(
data=params,
Expand Down Expand Up @@ -784,7 +780,6 @@ def _process_checkin(item: CheckinItem, txn: Transaction | Span) -> None:
category=DataCategory.MONITOR,
)

assert check_in.monitor_environment is not None
existing_env = check_in.monitor_environment.get_environment().name

env_mismatch_error: CheckinEnvironmentMismatch = {
Expand Down Expand Up @@ -919,7 +914,7 @@ def _process_checkin(item: CheckinItem, txn: Transaction | Span) -> None:
logger.exception("Failed to process check-in")


def process_checkin(item: CheckinItem) -> None:
def process_checkin(item: CheckinItem):
"""
Process an individual check-in
"""
Expand All @@ -937,7 +932,7 @@ def process_checkin(item: CheckinItem) -> None:
logger.exception("Failed to process check-in")


def process_checkin_group(items: list[CheckinItem]) -> None:
def process_checkin_group(items: list[CheckinItem]):
"""
Process a group of related check-ins (all part of the same monitor)
completely serially.
Expand All @@ -946,9 +941,7 @@ def process_checkin_group(items: list[CheckinItem]) -> None:
process_checkin(item)


def process_batch(
executor: ThreadPoolExecutor, message: Message[ValuesBatch[KafkaPayload]]
) -> None:
def process_batch(executor: ThreadPoolExecutor, message: Message[ValuesBatch[KafkaPayload]]):
"""
Receives batches of check-in messages. This function will take the batch
and group them together by monitor ID (ensuring order is preserved) and
Expand All @@ -959,8 +952,8 @@ def process_batch(
"""
batch = message.payload

latest_partition_ts: dict[int, datetime] = {}
checkin_mapping: dict[str, list[CheckinItem]] = defaultdict(list)
latest_partition_ts: Mapping[int, datetime] = {}
checkin_mapping: Mapping[str, list[CheckinItem]] = defaultdict(list)

for item in batch:
assert isinstance(item, BrokerValue)
Expand All @@ -979,13 +972,13 @@ def process_batch(
if wrapper["message_type"] == "clock_pulse":
continue

checkin_item = CheckinItem(
item = CheckinItem(
ts=item.timestamp,
partition=item.partition.index,
message=wrapper,
payload=json.loads(wrapper["payload"]),
)
checkin_mapping[checkin_item.processing_key].append(checkin_item)
checkin_mapping[item.processing_key].append(item)

# Number of check-ins that are being processed in this batch
metrics.gauge("monitors.checkin.parallel_batch_count", len(batch))
Expand All @@ -1001,7 +994,7 @@ def process_batch(
wait(futures)

# Update check in volume for the entire batch we've just processed
update_check_in_volume(item.timestamp for item in batch if item.timestamp is not None)
update_check_in_volume(item.timestamp for item in batch)

# Attempt to trigger monitor tasks across processed partitions
for partition, ts in latest_partition_ts.items():
Expand All @@ -1011,7 +1004,7 @@ def process_batch(
logger.exception("Failed to trigger monitor tasks")


def process_single(message: Message[KafkaPayload | FilteredPayload]) -> None:
def process_single(message: Message[KafkaPayload | FilteredPayload]):
assert not isinstance(message.payload, FilteredPayload)
assert isinstance(message.value, BrokerValue)

Expand Down
8 changes: 4 additions & 4 deletions src/sentry/monitors/system_incidents.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import logging
import statistics
from collections import Counter
from collections.abc import Generator, Iterable
from collections.abc import Generator, Sequence
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from enum import StrEnum
Expand Down Expand Up @@ -64,15 +64,15 @@
BACKFILL_CHUNKS = 10


def update_check_in_volume(ts_iter: Iterable[datetime]) -> None:
def update_check_in_volume(ts_list: Sequence[datetime]):
"""
Increment counters for a list of check-in timestamps. Each timestamp will be
trimmed to the minute and grouped appropriately
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

# Group timestamps down to the minute
for reference_ts, count in Counter(_make_reference_ts(ts) for ts in ts_iter).items():
for reference_ts, count in Counter(_make_reference_ts(ts) for ts in ts_list).items():
key = MONITOR_VOLUME_HISTORY.format(ts=reference_ts)

pipeline = redis_client.pipeline()
Expand Down Expand Up @@ -665,7 +665,7 @@ def _backfill_decisions(
return None


def _make_reference_ts(ts: datetime) -> int:
def _make_reference_ts(ts: datetime):
"""
Produce a timestamp number with the seconds and microsecond removed
"""
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/monitors/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def get_max_runtime(max_runtime: int | None) -> timedelta:

# Generates a timeout_at value for new check-ins
def get_timeout_at(
monitor_config: dict | None, status: CheckInStatus, date_added: datetime | None
monitor_config: dict, status: CheckInStatus, date_added: datetime | None
) -> datetime | None:
if status == CheckInStatus.IN_PROGRESS and date_added is not None:
return date_added.replace(second=0, microsecond=0) + get_max_runtime(
Expand Down

0 comments on commit 6ff0bb7

Please sign in to comment.