diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index a6ee4e91a9b15..1efc0ae9aa3e3 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -74,6 +74,8 @@ export function getDefaultConfig(): PluginsServerConfig { TASKS_PER_WORKER: 10, INGESTION_CONCURRENCY: 10, INGESTION_BATCH_SIZE: 500, + INGESTION_OVERFLOW_ENABLED: false, + INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: false, PLUGINS_DEFAULT_LOG_LEVEL: isTestEnv() ? PluginLogLevel.Full : PluginLogLevel.Log, LOG_LEVEL: isTestEnv() ? LogLevel.Warn : LogLevel.Info, SENTRY_DSN: null, diff --git a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts index 2b9c4ce77152d..e452140931c7c 100644 --- a/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/analytics-events-ingestion-consumer.ts @@ -4,7 +4,6 @@ import { Counter } from 'prom-client' import { buildStringMatcher } from '../../config/config' import { KAFKA_EVENTS_PLUGIN_INGESTION, prefix as KAFKA_PREFIX } from '../../config/kafka-topics' import { Hub } from '../../types' -import { isIngestionOverflowEnabled } from '../../utils/env-utils' import { status } from '../../utils/status' import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion' import { IngestionConsumer } from './kafka-queue' @@ -24,7 +23,7 @@ export const startAnalyticsEventsIngestionConsumer = async ({ Consumes analytics events from the Kafka topic `events_plugin_ingestion` and processes them for ingestion into ClickHouse. - Before processing, if isIngestionOverflowEnabled and an event has + Before processing, if overflow rerouting is enabled and an event has overflowed the capacity for its (team_id, distinct_id) pair, it will not be processed here but instead re-produced into the `events_plugin_ingestion_overflow` topic for later processing. @@ -47,7 +46,11 @@ export const startAnalyticsEventsIngestionConsumer = async ({ // deployment, we require an env variable to be set to confirm this before // enabling re-production of events to the OVERFLOW topic. - const overflowMode = isIngestionOverflowEnabled() ? IngestionOverflowMode.Reroute : IngestionOverflowMode.Disabled + const overflowMode = hub.INGESTION_OVERFLOW_ENABLED + ? hub.INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY + ? IngestionOverflowMode.Reroute + : IngestionOverflowMode.RerouteRandomly + : IngestionOverflowMode.Disabled const tokenBlockList = buildStringMatcher(hub.DROP_EVENTS_BY_TOKEN, false) const batchHandler = async (messages: Message[], queue: IngestionConsumer): Promise => { diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index 588c2c92beb86..ec020cfb6921f 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -28,7 +28,8 @@ require('@sentry/tracing') export enum IngestionOverflowMode { Disabled, - Reroute, + Reroute, // preserves partition locality + RerouteRandomly, // discards partition locality ConsumeSplitByDistinctId, ConsumeSplitEvenly, } @@ -217,7 +218,9 @@ export async function eachBatchParallelIngestion( op: 'emitToOverflow', data: { eventCount: splitBatch.toOverflow.length }, }) - processingPromises.push(emitToOverflow(queue, splitBatch.toOverflow)) + processingPromises.push( + emitToOverflow(queue, splitBatch.toOverflow, overflowMode === IngestionOverflowMode.RerouteRandomly) + ) overflowSpan.finish() } @@ -257,14 +260,14 @@ function computeKey(pluginEvent: PipelineEvent): string { return `${pluginEvent.team_id ?? pluginEvent.token}:${pluginEvent.distinct_id}` } -async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[]) { +async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[], useRandomPartitioner: boolean) { ingestionOverflowingMessagesTotal.inc(kafkaMessages.length) await Promise.all( kafkaMessages.map((message) => queue.pluginsServer.kafkaProducer.produce({ topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, value: message.value, - key: null, // No locality guarantees in overflow + key: useRandomPartitioner ? undefined : message.key, headers: message.headers, waitForAck: true, }) @@ -286,6 +289,9 @@ export function splitIngestionBatch( toProcess: [], toOverflow: [], } + const shouldRerouteToOverflow = [IngestionOverflowMode.Reroute, IngestionOverflowMode.RerouteRandomly].includes( + overflowMode + ) if (overflowMode === IngestionOverflowMode.ConsumeSplitEvenly) { /** @@ -314,7 +320,7 @@ export function splitIngestionBatch( const batches: Map = new Map() for (const message of kafkaMessages) { - if (overflowMode === IngestionOverflowMode.Reroute && message.key == null) { + if (shouldRerouteToOverflow && message.key == null) { // Overflow detected by capture, reroute to overflow topic // Not applying tokenBlockList to save CPU. TODO: do so once token is in the message headers output.toOverflow.push(message) @@ -334,12 +340,8 @@ export function splitIngestionBatch( } const eventKey = computeKey(pluginEvent) - if ( - overflowMode === IngestionOverflowMode.Reroute && - !ConfiguredLimiter.consume(eventKey, 1, message.timestamp) - ) { + if (shouldRerouteToOverflow && !ConfiguredLimiter.consume(eventKey, 1, message.timestamp)) { // Local overflow detection triggering, reroute to overflow topic too - message.key = null ingestionPartitionKeyOverflowed.labels(`${pluginEvent.team_id ?? pluginEvent.token}`).inc() if (LoggingLimiter.consume(eventKey, 1)) { status.warn('🪣', `Local overflow detection triggered on key ${eventKey}`) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 82ff40eaaca9d..d8a46819fad9b 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -94,6 +94,8 @@ export interface PluginsServerConfig { TASKS_PER_WORKER: number // number of parallel tasks per worker thread INGESTION_CONCURRENCY: number // number of parallel event ingestion queues per batch INGESTION_BATCH_SIZE: number // kafka consumer batch size + INGESTION_OVERFLOW_ENABLED: boolean // whether or not overflow rerouting is enabled (only used by analytics-ingestion) + INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: boolean // whether or not Kafka message keys should be preserved or discarded when messages are rerouted to overflow TASK_TIMEOUT: number // how many seconds until tasks are timed out DATABASE_URL: string // Postgres database URL DATABASE_READONLY_URL: string // Optional read-only replica to the main Postgres database diff --git a/plugin-server/src/utils/env-utils.ts b/plugin-server/src/utils/env-utils.ts index c2c47f8b8e46d..77064a53ac8e3 100644 --- a/plugin-server/src/utils/env-utils.ts +++ b/plugin-server/src/utils/env-utils.ts @@ -42,11 +42,6 @@ export const isProdEnv = (): boolean => determineNodeEnv() === NodeEnv.Productio export const isCloud = (): boolean => !!process.env.CLOUD_DEPLOYMENT -export function isIngestionOverflowEnabled(): boolean { - const ingestionOverflowEnabled = process.env.INGESTION_OVERFLOW_ENABLED - return stringToBoolean(ingestionOverflowEnabled) -} - export function isOverflowBatchByDistinctId(): boolean { const overflowBatchByDistinctId = process.env.INGESTION_OVERFLOW_BATCH_BY_DISTINCT_ID return stringToBoolean(overflowBatchByDistinctId) diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index e042cf5c1ac34..e95cb2a4e51c6 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -107,32 +107,35 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { expect(runEventPipeline).not.toHaveBeenCalled() }) - it('reroutes excess events to OVERFLOW topic', async () => { - const now = Date.now() - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now) - const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false) - - const tokenBlockList = buildStringMatcher('another_token,more_token', false) - await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute) - - expect(consume).toHaveBeenCalledWith( - captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'], - 1, - now - ) - expect(captureIngestionWarning).not.toHaveBeenCalled() - expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({ - topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, - value: JSON.stringify(captureEndpointEvent1), - timestamp: captureEndpointEvent1['timestamp'], - offset: captureEndpointEvent1['offset'], - key: null, - waitForAck: true, - }) + it.each([IngestionOverflowMode.Reroute, IngestionOverflowMode.RerouteRandomly])( + 'reroutes excess events to OVERFLOW topic (mode=%p)', + async (overflowMode) => { + const now = Date.now() + const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now) + const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false) + + const tokenBlockList = buildStringMatcher('another_token,more_token', false) + await eachBatchParallelIngestion(tokenBlockList, batch, queue, overflowMode) + + expect(consume).toHaveBeenCalledWith( + captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'], + 1, + now + ) + expect(captureIngestionWarning).not.toHaveBeenCalled() + expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({ + topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, + value: JSON.stringify(captureEndpointEvent1), + timestamp: captureEndpointEvent1['timestamp'], + offset: captureEndpointEvent1['offset'], + key: overflowMode === IngestionOverflowMode.Reroute ? batch[0].key : undefined, + waitForAck: true, + }) - // Event is not processed here - expect(runEventPipeline).not.toHaveBeenCalled() - }) + // Event is not processed here + expect(runEventPipeline).not.toHaveBeenCalled() + } + ) it('does not reroute if not over capacity limit', async () => { const now = Date.now()