diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index 2c5637726743e..78e8202cc2ed8 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -155,12 +155,16 @@ export const parseKafkaMessage = async ( const headerResult = await readTokenFromHeaders(message.headers, getTeamFn) const token: string | undefined = headerResult.token - let teamIdWithConfig: null | TeamIDWithConfig = headerResult.teamIdWithConfig + const teamIdWithConfig: null | TeamIDWithConfig = headerResult.teamIdWithConfig + + if (!token) { + return dropMessage('no_token_in_header') + } // NB `==` so we're comparing undefined and null // if token was in the headers but, we could not load team config // then, we can return early - if (!!token && (teamIdWithConfig == null || teamIdWithConfig.teamId == null)) { + if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) { return dropMessage('header_token_present_team_missing_or_disabled', { token: token, }) @@ -183,31 +187,6 @@ export const parseKafkaMessage = async ( return dropMessage('received_non_snapshot_message') } - // TODO this mechanism is deprecated for blobby ingestion, we should remove it - // once we're happy that the new mechanism is working - // if there was not a token in the header then we try to load one from the message payload - if (teamIdWithConfig == null && messagePayload.team_id == null && !messagePayload.token) { - return dropMessage('no_token_in_header_or_payload') - } - - if (teamIdWithConfig == null) { - const token = messagePayload.token - - if (token) { - teamIdWithConfig = await getTeamFn(token) - } - } - - // NB `==` so we're comparing undefined and null - if (teamIdWithConfig == null || teamIdWithConfig.teamId == null) { - return dropMessage('token_fallback_team_missing_or_disabled', { - token: messagePayload.token, - teamId: messagePayload.team_id, - payloadTeamSource: messagePayload.team_id ? 'team' : messagePayload.token ? 'token' : 'unknown', - }) - } - // end of deprecated mechanism - const events: RRWebEvent[] = $snapshot_items.filter((event: any) => { // we sometimes see events that are null // there will always be some unexpected data but, we should try to filter out the worst of it diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts b/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts index 59a4b2e250dea..79602befd50f3 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts @@ -13,7 +13,7 @@ export function createIncomingRecordingMessage( // that has properties, and they have $snapshot_data // that will have data_items, which are the actual snapshots each individually compressed - const message: IncomingRecordingMessage = { + return { team_id: 1, distinct_id: 'distinct_id', session_id: 'session_id_1', @@ -33,12 +33,11 @@ export function createIncomingRecordingMessage( lowOffset: 1, highOffset: 1, timestamp: 1, + rawSize: 1, ...partialIncomingMessage.metadata, ...partialMetadata, }, } - - return message } export function createKafkaMessage( @@ -46,12 +45,13 @@ export function createKafkaMessage( messageOverrides: Partial = {}, eventProperties: Record = {} ): Message { - const message: Message = { + return { partition: 1, topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, offset: 0, timestamp: messageOverrides.timestamp ?? Date.now(), size: 1, + headers: [{ token: token.toString() }], ...messageOverrides, value: Buffer.from( @@ -70,8 +70,6 @@ export function createKafkaMessage( }) ), } - - return message } export function createTP(partition: number, topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS) { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts index c5a3851486d93..28c594566eb15 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/utils.test.ts @@ -65,14 +65,15 @@ describe('session-recording utils', () => { describe('parsing the message', () => { it('can parse a message correctly', async () => { - const parsedMessage = await parseKafkaMessage(validMessage('my-distinct-id'), () => - Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) + const parsedMessage = await parseKafkaMessage( + validMessage('my-distinct-id', [{ token: 'something' }]), + () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) ) expect(parsedMessage).toMatchSnapshot() }) it('can handle numeric distinct_ids', async () => { const numericId = 12345 - const parsedMessage = await parseKafkaMessage(validMessage(numericId), () => + const parsedMessage = await parseKafkaMessage(validMessage(numericId, [{ token: 'something' }]), () => Promise.resolve({ teamId: 1, consoleLogIngestionEnabled: false }) ) expect(parsedMessage).toMatchObject({ @@ -91,6 +92,7 @@ describe('session-recording utils', () => { const createMessage = ($snapshot_items: unknown[]) => { return { + headers: [{ token: 'the_token' }], value: Buffer.from( JSON.stringify({ uuid: '018a47df-a0f6-7761-8635-439a0aa873bb', @@ -191,10 +193,10 @@ describe('session-recording utils', () => { undefined, ], [ - 'calls the team id resolver twice when token is not in header, and is in body', + 'does not call the team id resolver when token is not in header, but is in body', undefined, 'the body token', - ['the body token'], + undefined, ], ])('%s', async (_name, headerToken, payloadToken, expectedCalls) => { await parseKafkaMessage(