Skip to content

Commit

Permalink
feat(capture): Add setting to be able to disable capture overflow ent…
Browse files Browse the repository at this point in the history
…irely (#21168)
  • Loading branch information
tkaemming authored Mar 28, 2024
1 parent 24e8176 commit d929ca9
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 12 deletions.
17 changes: 8 additions & 9 deletions posthog/api/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,11 +602,6 @@ def capture_internal(
token=token,
)

# We aim to always partition by {team_id}:{distinct_id} but allow
# overriding this to deal with hot partitions in specific cases.
# Setting the partition key to None means using random partitioning.
kafka_partition_key = None

if event["event"] in SESSION_RECORDING_EVENT_NAMES:
session_id = event["properties"]["$session_id"]
headers = [
Expand All @@ -623,13 +618,17 @@ def capture_internal(
parsed_event, event["event"], partition_key=session_id, headers=headers, overflowing=overflowing
)

# We aim to always partition by {team_id}:{distinct_id} but allow
# overriding this to deal with hot partitions in specific cases.
# Setting the partition key to None means using random partitioning.
candidate_partition_key = f"{token}:{distinct_id}"

if (
distinct_id.lower() not in LIKELY_ANONYMOUS_IDS
and not is_randomly_partitioned(candidate_partition_key)
or historical
not historical
and settings.CAPTURE_ALLOW_RANDOM_PARTITIONING
and (distinct_id.lower() in LIKELY_ANONYMOUS_IDS or is_randomly_partitioned(candidate_partition_key))
):
kafka_partition_key = None
else:
kafka_partition_key = hashlib.sha256(candidate_partition_key.encode()).hexdigest()

return log_event(parsed_event, event["event"], partition_key=kafka_partition_key, historical=historical)
Expand Down
20 changes: 17 additions & 3 deletions posthog/api/test/test_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import base64
import gzip
import json
from django.test import override_settings
import lzstring
import pathlib
import pytest
Expand Down Expand Up @@ -281,8 +282,7 @@ def test_is_randomly_partitioned(self):
assert is_randomly_partitioned(override_key) is True

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_capture_randomly_partitions_with_likely_anonymous_ids(self, kafka_produce):
"""Test is_randomly_partitioned in the prescence of likely anonymous ids."""
def _do_test_capture_with_likely_anonymous_ids(self, kafka_produce, expect_random_partitioning: bool):
for distinct_id in LIKELY_ANONYMOUS_IDS:
data = {
"event": "$autocapture",
Expand All @@ -298,9 +298,23 @@ def test_capture_randomly_partitions_with_likely_anonymous_ids(self, kafka_produ
)

kafka_produce.assert_called_with(
topic=KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC, data=ANY, key=None, headers=None
topic=KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC,
data=ANY,
key=None if expect_random_partitioning else ANY,
headers=None,
)

if not expect_random_partitioning:
assert kafka_produce.mock_calls[0].kwargs["key"] is not None

def test_capture_randomly_partitions_with_likely_anonymous_ids(self):
"""Test is_randomly_partitioned in the prescence of likely anonymous ids, if enabled."""
with override_settings(CAPTURE_ALLOW_RANDOM_PARTITIONING=True):
self._do_test_capture_with_likely_anonymous_ids(expect_random_partitioning=True)

with override_settings(CAPTURE_ALLOW_RANDOM_PARTITIONING=False):
self._do_test_capture_with_likely_anonymous_ids(expect_random_partitioning=False)

def test_cached_is_randomly_partitioned(self):
"""Assert the behavior of is_randomly_partitioned under certain cache settings.
Expand Down
5 changes: 5 additions & 0 deletions posthog/settings/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
# KEEP IN SYNC WITH plugin-server/src/config/config.ts
BUFFER_CONVERSION_SECONDS = get_from_env("BUFFER_CONVERSION_SECONDS", default=60, type_cast=int)

# Whether or not random partitioning (i.e. overflow routing) should be allowed.
# (Enabling this setting does not cause messages to be randomly
# partitioned.) Note that this setting, if disabled, takes precedence over other
# partitioning-related settings below.
CAPTURE_ALLOW_RANDOM_PARTITIONING = get_from_env("CAPTURE_ALLOW_RANDOM_PARTITIONING", True, type_cast=str_to_bool)

# A list of <team_id:distinct_id> pairs (in the format 2:myLovelyId) that we should use
# random partitioning for when producing events to the Kafka topic consumed by the plugin server.
Expand Down

0 comments on commit d929ca9

Please sign in to comment.