Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: signal which products are quota limited #16777

Closed
wants to merge 14 commits into from
Closed
125 changes: 121 additions & 4 deletions ee/api/test/test_capture.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import hashlib
import json
from typing import Any
from typing import Any, Tuple
from unittest.mock import patch

from django.http import HttpResponse
from django.test.client import Client
from django.utils import timezone
from kafka.errors import NoBrokersAvailable
from rest_framework import status

from posthog.settings.data_stores import KAFKA_EVENTS_PLUGIN_INGESTION
from posthog.test.base import APIBaseTest

Expand All @@ -20,6 +21,54 @@ def setUp(self):
super().setUp()
self.client = Client()

def _send_event(self) -> HttpResponse:
event_response = self.client.post(
"/e/",
data={
"data": json.dumps(
[
{"event": "beep", "properties": {"distinct_id": "eeee", "token": self.team.api_token}},
{"event": "boop", "properties": {"distinct_id": "aaaa", "token": self.team.api_token}},
]
),
"api_key": self.team.api_token,
},
)

return event_response

def _send_session_recording_event(
self,
number_of_events=1,
event_data={},
snapshot_source=3,
snapshot_type=1,
session_id="abc123",
window_id="def456",
distinct_id="ghi789",
timestamp=1658516991883,
) -> Tuple[dict, HttpResponse]:
event = {
"event": "$snapshot",
"properties": {
"$snapshot_data": {
"type": snapshot_type,
"data": {"source": snapshot_source, "data": event_data},
"timestamp": timestamp,
},
"$session_id": session_id,
"$window_id": window_id,
"distinct_id": distinct_id,
},
"offset": 1993,
}

capture_recording_response = self.client.post(
"/s/", data={"data": json.dumps([event for _ in range(number_of_events)]), "api_key": self.team.api_token}
)

return event, capture_recording_response

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_produce_to_kafka(self, kafka_produce):
response = self.client.post(
Expand Down Expand Up @@ -131,7 +180,7 @@ def test_kafka_connection_error(self, kafka_produce):
def test_partition_key_override(self, kafka_produce):
default_partition_key = f"{self.team.api_token}:id1"

response = self.client.post(
self.client.post(
"/capture/",
{
"data": json.dumps(
Expand All @@ -141,7 +190,7 @@ def test_partition_key_override(self, kafka_produce):
},
)

# By default we use (the hash of) <team_id:distinct_id> as the partition key
# By default, we use (the hash of) <team_id:distinct_id> as the partition key
kafka_produce_call = kafka_produce.call_args_list[0].kwargs
self.assertEqual(kafka_produce_call["key"], hashlib.sha256(default_partition_key.encode()).hexdigest())

Expand All @@ -162,3 +211,71 @@ def test_partition_key_override(self, kafka_produce):

kafka_produce_call = kafka_produce.call_args_list[1].kwargs
self.assertEqual(kafka_produce_call["key"], None)

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_quota_limits_ignored_if_disabled(self, kafka_produce) -> None:
from ee.billing.quota_limiting import QuotaResource, replace_limited_team_tokens

replace_limited_team_tokens(QuotaResource.RECORDINGS, {self.team.api_token: timezone.now().timestamp() + 10000})
replace_limited_team_tokens(QuotaResource.EVENTS, {self.team.api_token: timezone.now().timestamp() + 10000})
self._send_session_recording_event()
self.assertEqual(kafka_produce.call_count, 2)

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_quota_limits(self, kafka_produce) -> None:
from ee.billing.quota_limiting import QuotaResource, replace_limited_team_tokens

def _produce_events():
kafka_produce.reset_mock()
self._send_session_recording_event()
self._send_event()

with self.settings(QUOTA_LIMITING_ENABLED=True):
replace_limited_team_tokens(QuotaResource.EVENTS, {})
replace_limited_team_tokens(QuotaResource.RECORDINGS, {})

_produce_events()
self.assertEqual(kafka_produce.call_count, 4)

replace_limited_team_tokens(QuotaResource.EVENTS, {self.team.api_token: timezone.now().timestamp() + 10000})
_produce_events()
self.assertEqual(kafka_produce.call_count, 2) # Only the recording event

replace_limited_team_tokens(
QuotaResource.RECORDINGS, {self.team.api_token: timezone.now().timestamp() + 10000}
)
_produce_events()
self.assertEqual(kafka_produce.call_count, 0) # No events

replace_limited_team_tokens(
QuotaResource.RECORDINGS, {self.team.api_token: timezone.now().timestamp() - 10000}
)
replace_limited_team_tokens(QuotaResource.EVENTS, {self.team.api_token: timezone.now().timestamp() - 10000})
_produce_events()
self.assertEqual(kafka_produce.call_count, 4) # All events as limit-until timestamp is in the past

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_quota_limited_recordings_return_retry_after_header(self, _kafka_produce) -> None:
with self.settings(QUOTA_LIMITING_ENABLED=True):
from ee.billing.quota_limiting import QuotaResource, replace_limited_team_tokens

replace_limited_team_tokens(
QuotaResource.RECORDINGS, {self.team.api_token: timezone.now().timestamp() + 10000}
)
_, response = self._send_session_recording_event()

assert response.json() == {
"status": 1,
"quotaLimited": ["sessionRecordings"],
}

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_quota_limiting_does_not_affect_events_body(self, _kafka_produce) -> None:
with self.settings(QUOTA_LIMITING_ENABLED=True):
from ee.billing.quota_limiting import QuotaResource, replace_limited_team_tokens

replace_limited_team_tokens(QuotaResource.EVENTS, {self.team.api_token: timezone.now().timestamp() + 10000})

response = self._send_event()

assert response.json() == {"status": 1}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# name: ClickhouseTestExperimentSecondaryResults.test_basic_secondary_metric_results
'
/* user_id:55 celery:posthog.celery.sync_insight_caching_state */
/* user_id:58 celery:posthog.celery.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# name: ClickhouseTestFunnelExperimentResults.test_experiment_flow_with_event_results
'
/* user_id:61 celery:posthog.celery.sync_insight_caching_state */
/* user_id:64 celery:posthog.celery.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down Expand Up @@ -138,7 +138,7 @@
---
# name: ClickhouseTestFunnelExperimentResults.test_experiment_flow_with_event_results_and_events_out_of_time_range_timezones
'
/* user_id:62 celery:posthog.celery.sync_insight_caching_state */
/* user_id:65 celery:posthog.celery.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down Expand Up @@ -276,7 +276,7 @@
---
# name: ClickhouseTestFunnelExperimentResults.test_experiment_flow_with_event_results_for_three_test_variants
'
/* user_id:64 celery:posthog.celery.sync_insight_caching_state */
/* user_id:67 celery:posthog.celery.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down Expand Up @@ -414,7 +414,7 @@
---
# name: ClickhouseTestFunnelExperimentResults.test_experiment_flow_with_event_results_with_hogql_aggregation
'
/* user_id:65 celery:posthog.celery.sync_insight_caching_state */
/* user_id:68 celery:posthog.celery.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down Expand Up @@ -552,7 +552,7 @@
---
# name: ClickhouseTestTrendExperimentResults.test_experiment_flow_with_event_results
'
/* user_id:67 celery:posthog.celery.sync_insight_caching_state */
/* user_id:70 celery:posthog.celery.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down Expand Up @@ -749,7 +749,7 @@
---
# name: ClickhouseTestTrendExperimentResults.test_experiment_flow_with_event_results_for_three_test_variants
'
/* user_id:68 celery:posthog.celery.sync_insight_caching_state */
/* user_id:71 celery:posthog.celery.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down Expand Up @@ -892,7 +892,7 @@
---
# name: ClickhouseTestTrendExperimentResults.test_experiment_flow_with_event_results_out_of_timerange_timezone
'
/* user_id:70 celery:posthog.celery.sync_insight_caching_state */
/* user_id:73 celery:posthog.celery.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down Expand Up @@ -1089,7 +1089,7 @@
---
# name: ClickhouseTestTrendExperimentResults.test_experiment_flow_with_event_results_with_hogql_filter
'
/* user_id:72 celery:posthog.celery.sync_insight_caching_state */
/* user_id:75 celery:posthog.celery.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down
37 changes: 31 additions & 6 deletions posthog/api/capture.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dataclasses
import hashlib
import json
import re
Expand Down Expand Up @@ -252,34 +253,47 @@ def drop_performance_events(events: List[Any]) -> List[Any]:
return cleaned_list


def drop_events_over_quota(token: str, events: List[Any]) -> List[Any]:
@dataclasses.dataclass(frozen=True)
class EventsOverQuotaResult:
events: List[Any]
events_were_limited: bool
recordings_were_limited: bool


def drop_events_over_quota(token: str, events: List[Any]) -> EventsOverQuotaResult:
if not settings.EE_AVAILABLE:
return events
return EventsOverQuotaResult(events, False, False)

from ee.billing.quota_limiting import QuotaResource, list_limited_team_tokens

results = []
limited_tokens_events = list_limited_team_tokens(QuotaResource.EVENTS)
limited_tokens_recordings = list_limited_team_tokens(QuotaResource.RECORDINGS)

recordings_were_limited = False
events_were_limited = False
for event in events:
if event.get("event") in SESSION_RECORDING_EVENT_NAMES:
EVENTS_RECEIVED_COUNTER.labels(resource_type="recordings").inc()
if token in limited_tokens_recordings:
EVENTS_DROPPED_OVER_QUOTA_COUNTER.labels(resource_type="recordings", token=token).inc()
if settings.QUOTA_LIMITING_ENABLED:
recordings_were_limited = True
continue

else:
EVENTS_RECEIVED_COUNTER.labels(resource_type="events").inc()
if token in limited_tokens_events:
EVENTS_DROPPED_OVER_QUOTA_COUNTER.labels(resource_type="events", token=token).inc()
if settings.QUOTA_LIMITING_ENABLED:
events_were_limited = True
continue

results.append(event)

return results
return EventsOverQuotaResult(
results, events_were_limited=events_were_limited, recordings_were_limited=recordings_were_limited
)


@csrf_exempt
Expand Down Expand Up @@ -360,10 +374,15 @@ def get_event(request):
except Exception as e:
capture_exception(e)

# TODO we're not going to return 429 on events before we audit our SDKs
# events_were_quota_limited = False
recordings_were_quota_limited = False
try:
events = drop_events_over_quota(token, events)
events_over_quota_result = drop_events_over_quota(token, events)
events = events_over_quota_result.events
# events_were_quota_limited = events_over_quota_result.events_were_limited
recordings_were_quota_limited = events_over_quota_result.recordings_were_limited
except Exception as e:
# NOTE: Whilst we are testing this code we want to track exceptions but allow the events through if anything goes wrong
capture_exception(e)

consumer_destination = "v2" if random() <= settings.REPLAY_EVENTS_NEW_CONSUMER_RATIO else "v1"
Expand Down Expand Up @@ -481,7 +500,13 @@ def get_event(request):
pass

statsd.incr("posthog_cloud_raw_endpoint_success", tags={"endpoint": "capture"})
return cors_response(request, JsonResponse({"status": 1}))

response_payload: Dict[str, Any] = {"status": 1}

if recordings_were_quota_limited:
response_payload["quota_limited"] = ["recordings"]

return cors_response(request, JsonResponse(response_payload))


def preprocess_events(events: List[Dict[str, Any]]) -> Iterator[Tuple[Dict[str, Any], UUIDT, str]]:
Expand Down
16 changes: 8 additions & 8 deletions posthog/api/test/__snapshots__/test_cohort.ambr
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# name: TestCohort.test_async_deletion_of_cohort
'
/* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:100 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -10,7 +10,7 @@
---
# name: TestCohort.test_async_deletion_of_cohort.1
'
/* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:100 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
INSERT INTO cohortpeople
SELECT id,
2 as cohort_id,
Expand Down Expand Up @@ -114,7 +114,7 @@
---
# name: TestCohort.test_async_deletion_of_cohort.2
'
/* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:100 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -124,7 +124,7 @@
---
# name: TestCohort.test_async_deletion_of_cohort.3
'
/* user_id:102 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
/* user_id:100 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
SELECT count()
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -134,7 +134,7 @@
---
# name: TestCohort.test_async_deletion_of_cohort.4
'
/* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:100 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -144,7 +144,7 @@
---
# name: TestCohort.test_async_deletion_of_cohort.5
'
/* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:100 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
INSERT INTO cohortpeople
SELECT id,
2 as cohort_id,
Expand Down Expand Up @@ -178,7 +178,7 @@
---
# name: TestCohort.test_async_deletion_of_cohort.6
'
/* user_id:102 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:100 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -188,7 +188,7 @@
---
# name: TestCohort.test_async_deletion_of_cohort.7
'
/* user_id:102 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
/* user_id:100 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
SELECT count()
FROM cohortpeople
WHERE team_id = 2
Expand Down
Loading