From dc36a05418584ca038a6ef0c0f93f038a65722f9 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 3 Apr 2024 12:05:51 -0700 Subject: [PATCH 1/3] Pass `produce` calls through to through to real producer. --- ...nalytics-events-ingestion-consumer.test.ts | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index e042cf5c1ac34..3f224a1e86604 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -4,6 +4,8 @@ import { eachBatchParallelIngestion, IngestionOverflowMode, } from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion' +import { Hub } from '../../../src/types' +import { createHub } from '../../../src/utils/db/hub' import { ConfiguredLimiter } from '../../../src/utils/token-bucket' import { captureIngestionWarning } from './../../../src/worker/ingestion/utils' @@ -47,6 +49,8 @@ const captureEndpointEvent2 = { } describe('eachBatchParallelIngestion with overflow reroute', () => { + let hub: Hub + let closeServer: () => Promise let queue: any function createBatchWithMultipleEventsWithKeys(events: any[], timestamp?: any): any { @@ -60,20 +64,20 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { })) } - beforeEach(() => { + beforeEach(async () => { + ;[hub, closeServer] = await createHub() queue = { bufferSleep: jest.fn(), - pluginsServer: { - INGESTION_CONCURRENCY: 4, - kafkaProducer: { - produce: jest.fn(), - }, - db: 'database', - }, + pluginsServer: hub, } jest.mock('./../../../src/worker/ingestion/event-pipeline/runner') }) + afterEach(async () => { + await closeServer() + jest.clearAllMocks() + }) + it('reroutes events with no key to OVERFLOW topic', async () => { const batch = [ { @@ -86,15 +90,15 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { token: 'ok', }, ] - const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false) + const produce = jest.spyOn(queue.pluginsServer.kafkaProducer, 'produce') const tokenBlockList = buildStringMatcher('another_token,more_token', false) await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute) expect(consume).not.toHaveBeenCalled() expect(captureIngestionWarning).not.toHaveBeenCalled() - expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({ + expect(produce).toHaveBeenCalledWith({ topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, value: JSON.stringify(captureEndpointEvent1), timestamp: captureEndpointEvent1['timestamp'], @@ -111,6 +115,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { const now = Date.now() const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now) const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false) + const produce = jest.spyOn(queue.pluginsServer.kafkaProducer, 'produce') const tokenBlockList = buildStringMatcher('another_token,more_token', false) await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute) @@ -121,7 +126,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { now ) expect(captureIngestionWarning).not.toHaveBeenCalled() - expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({ + expect(produce).toHaveBeenCalledWith({ topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, value: JSON.stringify(captureEndpointEvent1), timestamp: captureEndpointEvent1['timestamp'], @@ -138,6 +143,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { const now = Date.now() const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1, captureEndpointEvent2], now) const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true) + const produce = jest.spyOn(queue.pluginsServer.kafkaProducer, 'produce') const tokenBlockList = buildStringMatcher('another_token,more_token', false) await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute) @@ -153,7 +159,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { now ) expect(captureIngestionWarning).not.toHaveBeenCalled() - expect(queue.pluginsServer.kafkaProducer.produce).not.toHaveBeenCalled() + expect(produce).not.toHaveBeenCalled() // Event is processed expect(runEventPipeline).toHaveBeenCalledTimes(2) }) @@ -164,6 +170,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { [captureEndpointEvent1, captureEndpointEvent2, captureEndpointEvent1], now ) + const produce = jest.spyOn(queue.pluginsServer.kafkaProducer, 'produce') const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true) const tokenBlockList = buildStringMatcher('mytoken,another_token', false) @@ -176,7 +183,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { now ) expect(captureIngestionWarning).not.toHaveBeenCalled() - expect(queue.pluginsServer.kafkaProducer.produce).not.toHaveBeenCalled() + expect(produce).not.toHaveBeenCalled() expect(runEventPipeline).toHaveBeenCalledTimes(1) }) }) From ceb218a0502b6ac2f55d0cbe8d2d903c6a5ea2fd Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 3 Apr 2024 12:30:37 -0700 Subject: [PATCH 2/3] Better align tests with actual interfaces/behavior --- ...nalytics-events-ingestion-consumer.test.ts | 56 +++++++++---------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index 3f224a1e86604..b5f163781ed23 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -1,3 +1,5 @@ +import { Message } from 'node-rdkafka' + import { buildStringMatcher } from '../../../src/config/config' import { KAFKA_EVENTS_PLUGIN_INGESTION, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW } from '../../../src/config/kafka-topics' import { @@ -53,14 +55,19 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { let closeServer: () => Promise let queue: any - function createBatchWithMultipleEventsWithKeys(events: any[], timestamp?: any): any { - return events.map((event) => ({ + function makeMessageKey(event: any): string { + return event.token + ':' + event.distinct_id + } + + function createBatchWithMultipleEvents(events: any[], timestamp?: any, withKey: boolean = true): Message[] { + return events.map((event, i) => ({ partition: 0, topic: KAFKA_EVENTS_PLUGIN_INGESTION, - value: JSON.stringify(event), + value: Buffer.from(JSON.stringify(event)), timestamp, - offset: event.offset, - key: event.team_id + ':' + event.distinct_id, + offset: i, + key: withKey ? makeMessageKey(event) : null, + size: 0, // irrelevant, but needed for type checking })) } @@ -79,30 +86,24 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { }) it('reroutes events with no key to OVERFLOW topic', async () => { - const batch = [ - { - partition: 0, - topic: KAFKA_EVENTS_PLUGIN_INGESTION, - value: JSON.stringify(captureEndpointEvent1), - timestamp: captureEndpointEvent1['timestamp'], - offset: captureEndpointEvent1['offset'], - key: null, - token: 'ok', - }, - ] + const now = Date.now() + const [message] = createBatchWithMultipleEvents( + [captureEndpointEvent1], + now, + false // act as if this message was intended to be routed to overflow by capture + ) + const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false) const produce = jest.spyOn(queue.pluginsServer.kafkaProducer, 'produce') const tokenBlockList = buildStringMatcher('another_token,more_token', false) - await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute) + await eachBatchParallelIngestion(tokenBlockList, [message], queue, IngestionOverflowMode.Reroute) expect(consume).not.toHaveBeenCalled() expect(captureIngestionWarning).not.toHaveBeenCalled() expect(produce).toHaveBeenCalledWith({ topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, - value: JSON.stringify(captureEndpointEvent1), - timestamp: captureEndpointEvent1['timestamp'], - offset: captureEndpointEvent1['offset'], + value: message.value, key: null, waitForAck: true, }) @@ -113,24 +114,23 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { it('reroutes excess events to OVERFLOW topic', async () => { const now = Date.now() - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1], now) + const event = captureEndpointEvent1 + const [message] = createBatchWithMultipleEvents([event], now) const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false) const produce = jest.spyOn(queue.pluginsServer.kafkaProducer, 'produce') const tokenBlockList = buildStringMatcher('another_token,more_token', false) - await eachBatchParallelIngestion(tokenBlockList, batch, queue, IngestionOverflowMode.Reroute) + await eachBatchParallelIngestion(tokenBlockList, [message], queue, IngestionOverflowMode.Reroute) expect(consume).toHaveBeenCalledWith( - captureEndpointEvent1['token'] + ':' + captureEndpointEvent1['distinct_id'], + makeMessageKey(event), // NOTE: can't use ``message.key`` here as it will already have been mutated 1, now ) expect(captureIngestionWarning).not.toHaveBeenCalled() expect(produce).toHaveBeenCalledWith({ topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, - value: JSON.stringify(captureEndpointEvent1), - timestamp: captureEndpointEvent1['timestamp'], - offset: captureEndpointEvent1['offset'], + value: message.value, key: null, waitForAck: true, }) @@ -141,7 +141,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { it('does not reroute if not over capacity limit', async () => { const now = Date.now() - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent1, captureEndpointEvent2], now) + const batch = createBatchWithMultipleEvents([captureEndpointEvent1, captureEndpointEvent2], now) const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true) const produce = jest.spyOn(queue.pluginsServer.kafkaProducer, 'produce') @@ -166,7 +166,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { it('does drop events from blocked tokens', async () => { const now = Date.now() - const batch = createBatchWithMultipleEventsWithKeys( + const batch = createBatchWithMultipleEvents( [captureEndpointEvent1, captureEndpointEvent2, captureEndpointEvent1], now ) From f13163edc602ba2442c5f88c9cd3e0e111c4e06a Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 3 Apr 2024 14:09:54 -0700 Subject: [PATCH 3/3] Just use `computeKey` which is already defined --- .../batch-processing/each-batch-ingestion.ts | 2 +- .../analytics-events-ingestion-consumer.test.ts | 9 +++------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index 13d8761598a83..21844a660ceb9 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -253,7 +253,7 @@ export async function eachBatchParallelIngestion( } } -function computeKey(pluginEvent: PipelineEvent): string { +export function computeKey(pluginEvent: PipelineEvent): string { return `${pluginEvent.team_id ?? pluginEvent.token}:${pluginEvent.distinct_id}` } diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index b5f163781ed23..e036f7c6b0016 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -3,6 +3,7 @@ import { Message } from 'node-rdkafka' import { buildStringMatcher } from '../../../src/config/config' import { KAFKA_EVENTS_PLUGIN_INGESTION, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW } from '../../../src/config/kafka-topics' import { + computeKey, eachBatchParallelIngestion, IngestionOverflowMode, } from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion' @@ -55,10 +56,6 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { let closeServer: () => Promise let queue: any - function makeMessageKey(event: any): string { - return event.token + ':' + event.distinct_id - } - function createBatchWithMultipleEvents(events: any[], timestamp?: any, withKey: boolean = true): Message[] { return events.map((event, i) => ({ partition: 0, @@ -66,7 +63,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { value: Buffer.from(JSON.stringify(event)), timestamp, offset: i, - key: withKey ? makeMessageKey(event) : null, + key: withKey ? computeKey(event) : null, size: 0, // irrelevant, but needed for type checking })) } @@ -123,7 +120,7 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { await eachBatchParallelIngestion(tokenBlockList, [message], queue, IngestionOverflowMode.Reroute) expect(consume).toHaveBeenCalledWith( - makeMessageKey(event), // NOTE: can't use ``message.key`` here as it will already have been mutated + computeKey(event), // NOTE: can't use ``message.key`` here as it will already have been mutated 1, now )