Skip to content

Commit

Permalink
feat: ingest ML-supplied URL metadata with some prospects
Browse files Browse the repository at this point in the history
- this will skip using the Parser for URL metadata for these prospects
- a little bit of refactoring/cleanup
  • Loading branch information
jpetto committed Dec 16, 2024
1 parent 6262378 commit 814747b
Show file tree
Hide file tree
Showing 7 changed files with 399 additions and 73 deletions.
3 changes: 1 addition & 2 deletions lambdas/prospect-translation-lambda/src/events/snowplow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,13 @@ export const generateContext = (
*/
export const generateSnowplowEntity = (
prospect: Prospect,
prospectSource: string,
runDetails: ProspectRunDetails,
features: ProspectFeatures,
): SnowplowProspect => {
return {
object_version: 'new',
prospect_id: prospect.prospectId,
prospect_source: prospectSource,
prospect_source: prospect.prospectType,
scheduled_surface_id: prospect.scheduledSurfaceGuid,
url: prospect.url,
title: prospect.title,
Expand Down
88 changes: 82 additions & 6 deletions lambdas/prospect-translation-lambda/src/index.integration.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { setupServer } from 'msw/node';
import { Callback, Context } from 'aws-lambda';
import * as Sentry from '@sentry/serverless';

import config from './config';

Expand All @@ -19,6 +20,8 @@ import {
*/
describe('prospect api translation lambda entry function', () => {
const server = setupServer();
const captureConsoleSpy = jest.spyOn(console, 'log').mockImplementation();
const sentrySpy = jest.spyOn(Sentry, 'captureException').mockImplementation();

beforeAll(() => server.listen({ onUnhandledRequest: 'bypass' }));

Expand All @@ -28,12 +31,14 @@ describe('prospect api translation lambda entry function', () => {
});

afterEach(() => {
// restoreAllMocks restores all mocks and replaced properties. clearAllMocks only clears mocks.
jest.restoreAllMocks();
// clear all mock history
jest.clearAllMocks();
server.resetHandlers();
});

afterAll(() => {
// restore all mocks and replaced properties/methods
jest.restoreAllMocks();
server.close();
});

Expand Down Expand Up @@ -81,6 +86,7 @@ describe('prospect api translation lambda entry function', () => {

// Check that the right Snowplow entity was included with the event.
const goodEvents = await getGoodSnowplowEvents();

for (let i = 0; i < 2; i++) {
const snowplowContext = parseSnowplowData(
goodEvents[i].rawEvent.parameters.cx,
Expand Down Expand Up @@ -119,8 +125,6 @@ describe('prospect api translation lambda entry function', () => {
],
};

const captureConsoleSpy = jest.spyOn(console, 'log').mockImplementation();

await processor(fakePayload, sqsContext, sqsCallback);

expect(captureConsoleSpy).toHaveBeenCalledWith(
Expand Down Expand Up @@ -152,8 +156,6 @@ describe('prospect api translation lambda entry function', () => {
],
};

const captureConsoleSpy = jest.spyOn(console, 'log').mockImplementation();

await processor(fakePayload, sqsContext, sqsCallback);

expect(captureConsoleSpy).toHaveBeenCalledWith(
Expand All @@ -162,4 +164,78 @@ describe('prospect api translation lambda entry function', () => {

expect(captureConsoleSpy).toHaveBeenCalledWith(`1 prospects had errors.`);
});

it('accepts prospects with ML-supplied URL metadata', async () => {
const fakePayload = {
Records: [
{
messageId: '1',
receiptHandle: 'handle',
attributes: {
ApproximateReceiveCount: '1',
SentTimestamp: 'time',
SenderId: 'sender id',
ApproximateFirstReceiveTimestamp: 'time',
},
messageAttributes: {},
md5OfMessageAttributes: null,
md5OfBody: 'ab6181399b03008ffaada54b68c77574',
eventSource: 'aws:sqs',
eventSourceARN:
'arn:aws:sqs:us-east-1:996905175585:ProspectAPI-Prod-Sqs-Translation-Queue',
awsRegion: 'us-east-1',
body: '{"version":"0","id":"ab02d85b-4cb6-9de9-b549-b572166b278f","detail-type":"prospect-generation","source":"prospect-events","account":"996905175585","time":"2024-04-16T00:05:59Z","region":"us-east-1","resources":[],"detail":{"id":"c71504d1-f14f-4181-a654-730d5855ec48","version":3,"candidates":[{"scheduled_surface_guid": "NEW_TAB_DE_DE", "prospect_id": "447c90d2-1084-5f83-a585-26edfbf5640e", "url": "https://www.spektrum.de/news/ninetyeast-ridge-eines-der-laengsten-gebirge-liegt-tief-unter-dem-meer/2246024", "prospect_source": "QA_SPORTS", "save_count": 0, "predicted_topic": "", "rank": 117, "data_source": "prospect", "title": "Eines der l\u00e4ngsten Gebirge liegt tief unter dem Meer", "excerpt": "Die Bergkette wurde durch ein seltenes vulkanisches Ph\u00e4nomen gebildet", "language": "EN", "image_url": "https://static.spektrum.de/fm/912/f1920x1080/triplejunction_gis_2014_lrg.8200272.png", "authors": ["Daniel Lingenh\u00f6hl"]},{"scheduled_surface_guid": "NEW_TAB_DE_DE", "prospect_id": "faa0631f-3a43-555a-9947-7ce3de165a92", "url": "https://www.spiegel.de/ausland/wahlen-in-thailand-wie-eine-kandidatin-fuer-ein-demokratischeres-thailand-kaempft-a-1fa52682-ca7a-45f9-816a-a031ca0a7950", "prospect_source": "QA_SPORTS", "save_count": 10, "predicted_topic": "POLITICS", "rank": 118, "data_source": "prospect", "title": "Parlamentswahlen in Thailand Frau Thamnitinans Kampf gegen die Putschisten", "excerpt": "Sie vertritt als Anwältin Regimegegner. Nun will Sasinan Thamnitinan sich im Parlament für mehr Demokratie in Thailand einsetzen. Unterwegs im ersten Wahlkampf nach den landesweiten Massenprotesten gegen das Militär.", "language": "DE", "image_url": "https://cdn.prod.www.spiegel.de/images/06720642-d6e0-42ed-a1e1-1ded9f79d1bd_w1280_r1.77_fpx68_fpy93.jpg", "authors": ["Maria Stöhr, DER SPIEGEL"]}]}}',
},
],
};

await processor(fakePayload, sqsContext, sqsCallback);

expect(sentrySpy).toHaveBeenCalledTimes(0);

expect(captureConsoleSpy).toHaveBeenCalledWith(
`2 prospects inserted into dynamo.`,
);
});

it('should send errors to Sentry when ML-supplied URL metadata is invalid', async () => {
// this payload has three errors in the second/last prospect:
// - missing title
// - number given for authors
// - invalid language code
const fakePayload = {
Records: [
{
messageId: '1',
receiptHandle: 'handle',
attributes: {
ApproximateReceiveCount: '1',
SentTimestamp: 'time',
SenderId: 'sender id',
ApproximateFirstReceiveTimestamp: 'time',
},
messageAttributes: {},
md5OfMessageAttributes: null,
md5OfBody: 'ab6181399b03008ffaada54b68c77574',
eventSource: 'aws:sqs',
eventSourceARN:
'arn:aws:sqs:us-east-1:996905175585:ProspectAPI-Prod-Sqs-Translation-Queue',
awsRegion: 'us-east-1',
body: '{"version":"0","id":"ab02d85b-4cb6-9de9-b549-b572166b278f","detail-type":"prospect-generation","source":"prospect-events","account":"996905175585","time":"2024-04-16T00:05:59Z","region":"us-east-1","resources":[],"detail":{"id":"c71504d1-f14f-4181-a654-730d5855ec48","version":3,"candidates":[{"scheduled_surface_guid": "NEW_TAB_DE_DE", "prospect_id": "447c90d2-1084-5f83-a585-26edfbf5640e", "url": "https://www.spektrum.de/news/ninetyeast-ridge-eines-der-laengsten-gebirge-liegt-tief-unter-dem-meer/2246024", "prospect_source": "QA_ENTERTAINMENT", "save_count": 0, "predicted_topic": "", "rank": 117, "data_source": "prospect", "title": "Eines der l\u00e4ngsten Gebirge liegt tief unter dem Meer", "excerpt": "Die Bergkette wurde durch ein seltenes vulkanisches Ph\u00e4nomen gebildet", "language": "EN", "image_url": "https://static.spektrum.de/fm/912/f1920x1080/triplejunction_gis_2014_lrg.8200272.png", "authors": ["Daniel Lingenh\u00f6hl"]},{"scheduled_surface_guid": "NEW_TAB_DE_DE", "prospect_id": "faa0631f-3a43-555a-9947-7ce3de165a92", "url": "https://www.spiegel.de/ausland/wahlen-in-thailand-wie-eine-kandidatin-fuer-ein-demokratischeres-thailand-kaempft-a-1fa52682-ca7a-45f9-816a-a031ca0a7950", "prospect_source": "QA_ENTERTAINMENT", "save_count": 10, "predicted_topic": "POLITICS", "rank": 118, "data_source": "prospect", "image_url": 16, "excerpt": "Sie vertritt als Anwältin Regimegegner. Nun will Sasinan Thamnitinan sich im Parlament für mehr Demokratie in Thailand einsetzen. Unterwegs im ersten Wahlkampf nach den landesweiten Massenprotesten gegen das Militär.", "language": "BB", "authors": 42}]}}',
},
],
};

await processor(fakePayload, sqsContext, sqsCallback);

// authors, topic, and language are invalid in the test payload above
// - all should have triggered a Sentry call
expect(sentrySpy).toHaveBeenCalledTimes(3);

// the errors in ML-supplied URL metadata should *not* stop the prospects
// from being inserted!
expect(captureConsoleSpy).toHaveBeenCalledWith(
`2 prospects inserted into dynamo.`,
);
});
});
20 changes: 12 additions & 8 deletions lambdas/prospect-translation-lambda/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,18 @@ export const processor: SQSHandler = async (event: SQSEvent): Promise<void> => {
// now get the metadata and put it into dynamo
// this function will send an exception to sentry if any part of it
// fails.
prospectIdsProcessed = await processProspect(
prospect,
prospectIdsProcessed,
rawSqsProspect.prospect_source,
runDetails,
features,
tracker,
);
await processProspect(prospect, runDetails, features, tracker);

// an edge case we've hit before - ML was sending duplicate prospects in a
// single batch. we don't need to error here - dynamo will silently replace
// the existing entry. logging seems like the best approach for now.
if (prospectIdsProcessed.includes(prospect.id)) {
console.log(
`${prospect.id} is a duplicate in this ${prospect.scheduledSurfaceGuid} / ${prospect.prospectType} batch!`,
);
}

prospectIdsProcessed.push(prospect.id);
}
}
}
Expand Down
166 changes: 163 additions & 3 deletions lambdas/prospect-translation-lambda/src/lib.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
validateProperties,
validateStructure,
} from './lib';
import { SqsProspect, ProspectTypesWithMlUrlMetadata } from './types';

describe('lib', () => {
const captureExceptionSpy = jest
Expand Down Expand Up @@ -61,7 +62,7 @@ describe('lib', () => {
title: 'Test-Title',
publisher: 'test-publisher',
isCollection: false,
isSyndicated: true,
isSyndicated: false,
authors: 'questlove,rafael frumkin',
};

Expand Down Expand Up @@ -317,6 +318,165 @@ describe('lib', () => {
expect(result.topic).toEqual(expected.topic);
expect(result.saveCount).toEqual(expected.saveCount);
});

it('should create a prospect from an SqsProspect with ML-supplied metadata', () => {
const validSqsProspectWithMlData: SqsProspect = {
...validSqsProspect,
// pick any prospect type that has ML-supplied metadata
prospect_source: ProspectTypesWithMlUrlMetadata[0],
authors: ['Daniel Lingenh\u00f6hl'],
excerpt:
'Die Bergkette wurde durch ein seltenes vulkanisches Ph\u00e4nomen gebildet',
image_url:
'https://static.spektrum.de/fm/912/f1920x1080/triplejunction_gis_2014_lrg.8200272.png',
language: 'EN',
title: 'Eines der l\u00e4ngsten Gebirge liegt tief unter dem Meer',
};

const expected: Prospect = {
id: 'c3h5n3o9',
prospectId: validSqsProspectWithMlData.prospect_id,
scheduledSurfaceGuid: validSqsProspectWithMlData.scheduled_surface_guid,
url: validSqsProspectWithMlData.url,
prospectType: ProspectType[validSqsProspectWithMlData.prospect_source],
topic: Topics[validSqsProspectWithMlData.predicted_topic],
saveCount: validSqsProspectWithMlData.save_count,
rank: validSqsProspectWithMlData.rank,
authors: 'Daniel Lingenh\u00f6hl',
excerpt:
'Die Bergkette wurde durch ein seltenes vulkanisches Ph\u00e4nomen gebildet',
imageUrl:
'https://static.spektrum.de/fm/912/f1920x1080/triplejunction_gis_2014_lrg.8200272.png',
language: 'EN',
title: 'Eines der l\u00e4ngsten Gebirge liegt tief unter dem Meer',
};

const result = convertSqsProspectToProspect(validSqsProspectWithMlData);

expect(result.id).toBeDefined(); // we trust uuidV4 to work
expect(result.prospectId).toEqual(expected.prospectId);
expect(result.scheduledSurfaceGuid).toEqual(
expected.scheduledSurfaceGuid,
);
expect(result.url).toEqual(expected.url);
expect(result.prospectType).toEqual(expected.prospectType);
expect(result.topic).toEqual(expected.topic);
expect(result.saveCount).toEqual(expected.saveCount);
expect(result.authors).toEqual(expected.authors);
expect(result.excerpt).toEqual(expected.excerpt);
expect(result.imageUrl).toEqual(expected.imageUrl);
expect(result.language).toEqual(expected.language);
expect(result.title).toEqual(expected.title);
});

it('should emit Sentry errors when ML-supplied URL metadata is invalid', () => {
const validSqsProspectWithMlData: any = {
...validSqsProspect,
// pick any prospect type that has ML-supplied metadata
prospect_source: ProspectTypesWithMlUrlMetadata[0],
// invalid - should get thrown out
authors: 42,
excerpt:
'Die Bergkette wurde durch ein seltenes vulkanisches Ph\u00e4nomen gebildet',
// should get converted to a string
image_url: 16,
// invalid - should get thrown out
language: 'BB',
};

const expected: Prospect = {
id: 'c3h5n3o9',
prospectId: validSqsProspectWithMlData.prospect_id,
scheduledSurfaceGuid: validSqsProspectWithMlData.scheduled_surface_guid,
url: validSqsProspectWithMlData.url,
prospectType: ProspectType[validSqsProspectWithMlData.prospect_source],
topic: Topics[validSqsProspectWithMlData.predicted_topic],
saveCount: validSqsProspectWithMlData.save_count,
rank: validSqsProspectWithMlData.rank,
excerpt:
'Die Bergkette wurde durch ein seltenes vulkanisches Ph\u00e4nomen gebildet',
imageUrl: '16',
};

const result = convertSqsProspectToProspect(validSqsProspectWithMlData);

expect(result.id).toBeDefined(); // we trust uuidV4 to work
expect(result.prospectId).toEqual(expected.prospectId);
expect(result.scheduledSurfaceGuid).toEqual(
expected.scheduledSurfaceGuid,
);
expect(result.url).toEqual(expected.url);
expect(result.prospectType).toEqual(expected.prospectType);
expect(result.topic).toEqual(expected.topic);
expect(result.saveCount).toEqual(expected.saveCount);
expect(result.authors).toEqual(undefined);
expect(result.excerpt).toEqual(expected.excerpt);
expect(result.imageUrl).toEqual(expected.imageUrl);
expect(result.language).toEqual(undefined);
expect(result.title).toEqual(undefined);

expect(captureExceptionSpy).toHaveBeenCalledTimes(3);

expect(captureExceptionSpy).toHaveBeenNthCalledWith(
1,
`Invalid ML supplied value for 'authors': 42`,
);

expect(captureExceptionSpy).toHaveBeenNthCalledWith(
2,
`Invalid ML supplied value for 'title': undefined`,
);

expect(captureExceptionSpy).toHaveBeenNthCalledWith(
3,
`Invalid ML supplied value for 'language': BB`,
);
});

it('should not use ML-supplied URL metadata if the prospect is not configured to do so', () => {
const validSqsProspectWithMlData: any = {
...validSqsProspect,
// pick any prospect type NOT configured to use ML-supplied metadata
prospect_source: ProspectType.RSS_LOGISTIC,
authors: ['Daniel Lingenh\u00f6hl'],
excerpt:
'Die Bergkette wurde durch ein seltenes vulkanisches Ph\u00e4nomen gebildet',
image_url:
'https://static.spektrum.de/fm/912/f1920x1080/triplejunction_gis_2014_lrg.8200272.png',
language: 'EN',
title: 'Eines der l\u00e4ngsten Gebirge liegt tief unter dem Meer',
};

const expected: Prospect = {
id: 'c3h5n3o9',
prospectId: validSqsProspectWithMlData.prospect_id,
scheduledSurfaceGuid: validSqsProspectWithMlData.scheduled_surface_guid,
url: validSqsProspectWithMlData.url,
prospectType: ProspectType[validSqsProspectWithMlData.prospect_source],
topic: Topics[validSqsProspectWithMlData.predicted_topic],
saveCount: validSqsProspectWithMlData.save_count,
rank: validSqsProspectWithMlData.rank,
};

const result = convertSqsProspectToProspect(validSqsProspectWithMlData);

expect(result.id).toBeDefined(); // we trust uuidV4 to work
expect(result.prospectId).toEqual(expected.prospectId);
expect(result.scheduledSurfaceGuid).toEqual(
expected.scheduledSurfaceGuid,
);
expect(result.url).toEqual(expected.url);
expect(result.prospectType).toEqual(expected.prospectType);
expect(result.topic).toEqual(expected.topic);
expect(result.saveCount).toEqual(expected.saveCount);
expect(result.authors).toEqual(undefined);
expect(result.excerpt).toEqual(undefined);
expect(result.imageUrl).toEqual(undefined);
expect(result.language).toEqual(undefined);
expect(result.title).toEqual(undefined);

expect(captureExceptionSpy).toHaveBeenCalledTimes(0);
});
});

describe('hydrateProspectMetaData', () => {
Expand Down Expand Up @@ -388,8 +548,8 @@ describe('lib', () => {
language: undefined,
title: undefined,
publisher: undefined,
isCollection: undefined,
isSyndicated: undefined,
isCollection: false,
isSyndicated: false,
authors: undefined,
};

Expand Down
Loading

0 comments on commit 814747b

Please sign in to comment.