Skip to content

Commit

Permalink
feat(cdp): hog transformations for geoip (#27646)
Browse files Browse the repository at this point in the history
Co-authored-by: Ben White <[email protected]>
  • Loading branch information
meikelmosby and benjackwhite authored Jan 17, 2025
1 parent 0efdbdc commit 24e9d94
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 56 deletions.
40 changes: 6 additions & 34 deletions plugin-server/src/cdp/hog-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,10 @@ export class HogExecutor {
}
}

execute(invocation: HogFunctionInvocation): HogFunctionInvocationResult {
execute(
invocation: HogFunctionInvocation,
options: { functions?: Record<string, (args: unknown[]) => unknown> } = {}
): HogFunctionInvocationResult {
const loggingContext = {
invocationId: invocation.id,
hogFunctionId: invocation.hogFunction.id,
Expand Down Expand Up @@ -458,46 +461,14 @@ export class HogExecutor {

try {
let hogLogs = 0

execRes = execHog(invocationInput, {
globals: invocation.functionToExecute ? undefined : globals,
maxAsyncSteps: MAX_ASYNC_STEPS, // NOTE: This will likely be configurable in the future
asyncFunctions: {
// We need to pass these in but they don't actually do anything as it is a sync exec
fetch: async () => Promise.resolve(),
},
// importBytecode: (module) => {
// // TODO: more than one hardcoded module
// if (module === 'provider/email') {
// const provider = this.hogFunctionManager.getTeamHogEmailProvider(invocation.teamId)
// if (!provider) {
// throw new Error('No email provider configured')
// }
// try {
// const providerGlobals = this.buildHogFunctionGlobals({
// id: '',
// teamId: invocation.teamId,
// hogFunction: provider,
// globals: {} as any,
// queue: 'hog',
// timings: [],
// priority: 0,
// } satisfies HogFunctionInvocation)

// return {
// bytecode: provider.bytecode,
// globals: providerGlobals,
// }
// } catch (e) {
// result.logs.push({
// level: 'error',
// timestamp: DateTime.now(),
// message: `Error building inputs: ${e}`,
// })
// throw e
// }
// }
// throw new Error(`Can't import unknown module: ${module}`)
// },
functions: {
print: (...args) => {
hogLogs++
Expand Down Expand Up @@ -552,6 +523,7 @@ export class HogExecutor {
},
})
},
...(options.functions ?? {}),
},
})
if (execRes.error) {
Expand Down
52 changes: 52 additions & 0 deletions plugin-server/src/cdp/hog-function-manager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { Hub } from '../types'
import { HogFunctionManager } from './hog-function-manager'

describe('HogFunctionManager', () => {
let hub: Hub
let manager: HogFunctionManager

beforeEach(() => {
hub = {
mmdb: undefined, // No MMDB configured
postgres: {
query: jest.fn().mockResolvedValue({ rows: [] }),
transaction: jest.fn(),
},
CELERY_DEFAULT_QUEUE: 'celery-default',
PLUGINS_CELERY_QUEUE: 'plugins-celery',
OBJECT_STORAGE_ENABLED: true,
OBJECT_STORAGE_REGION: '',
OBJECT_STORAGE_ENDPOINT: '',
OBJECT_STORAGE_ACCESS_KEY_ID: '',
OBJECT_STORAGE_SECRET_ACCESS_KEY: '',
OBJECT_STORAGE_BUCKET: '',
statsd: {
timing: jest.fn(),
increment: jest.fn(),
gauge: jest.fn(),
close: jest.fn(),
},
instanceId: 'test',
capabilities: {},
} as any as Hub

manager = new HogFunctionManager(hub)
})

describe('start()', () => {
it('should fail if transformations are enabled but MMDB is not configured', async () => {
await expect(manager.start(['transformation'])).rejects.toThrow(
'GeoIP transformation requires MMDB to be configured. Please ensure the MMDB file is properly set up.'
)
})

it('should start successfully if MMDB is configured', async () => {
hub.mmdb = {} as any // Mock MMDB as configured
await expect(manager.start(['transformation'])).resolves.not.toThrow()
})

it('should start successfully if transformations are not enabled', async () => {
await expect(manager.start(['destination'])).resolves.not.toThrow()
})
})
})
1 change: 1 addition & 0 deletions plugin-server/src/cdp/hog-function-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export class HogFunctionManager {
public async start(hogTypes: HogFunctionTypeType[]): Promise<void> {
this.hogTypes = hogTypes
// TRICKY - when running with individual capabilities, this won't run twice but locally or as a complete service it will...

if (this.started) {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,214 @@ import { HogFunctionInvocationGlobals } from '../../../types'
import { TemplateTester } from '../../test/test-helpers'
import { template } from './geoip.template'

describe('geoip template', () => {
describe('geoip.template', () => {
const tester = new TemplateTester(template)

const mockGeoipLookup = jest.fn()
let mockGlobals: HogFunctionInvocationGlobals

beforeEach(async () => {
mockGeoipLookup.mockReturnValue({
city: { names: { en: 'Sydney' } },
country: { names: { en: 'Australia' } },
})
await tester.beforeEach()
jest.useFakeTimers().setSystemTime(new Date('2025-01-01'))
})

it('should enrich event with IP location', async () => {
mockGlobals = tester.createGlobals({
event: {
properties: {
$ip: '127.0.0.1',
$ip: '89.160.20.129',
},
},
})

const response = await tester.invoke({}, mockGlobals)

expect(response.finished).toBe(true)
expect(response.error).toBeUndefined()
expect(response.execResult).toMatchInlineSnapshot(`
{
"distinct_id": "distinct-id",
"elements_chain": "",
"event": "event-name",
"properties": {
"$geoip_accuracy_radius": 76,
"$geoip_city_name": "Linköping",
"$geoip_continent_code": "EU",
"$geoip_continent_name": "Europe",
"$geoip_country_code": "SE",
"$geoip_country_name": "Sweden",
"$geoip_latitude": 58.4167,
"$geoip_longitude": 15.6167,
"$geoip_subdivision_1_code": "E",
"$geoip_subdivision_1_name": "Östergötland County",
"$geoip_time_zone": "Europe/Stockholm",
"$ip": "89.160.20.129",
"$set": {
"$geoip_accuracy_radius": 76,
"$geoip_city_confidence": null,
"$geoip_city_name": "Linköping",
"$geoip_continent_code": "EU",
"$geoip_continent_name": "Europe",
"$geoip_country_code": "SE",
"$geoip_country_name": "Sweden",
"$geoip_latitude": 58.4167,
"$geoip_longitude": 15.6167,
"$geoip_postal_code": null,
"$geoip_subdivision_1_code": "E",
"$geoip_subdivision_1_name": "Östergötland County",
"$geoip_subdivision_2_code": null,
"$geoip_subdivision_2_name": null,
"$geoip_time_zone": "Europe/Stockholm",
},
"$set_once": {
"$initial_geoip_accuracy_radius": 76,
"$initial_geoip_city_confidence": null,
"$initial_geoip_city_name": "Linköping",
"$initial_geoip_continent_code": "EU",
"$initial_geoip_continent_name": "Europe",
"$initial_geoip_country_code": "SE",
"$initial_geoip_country_name": "Sweden",
"$initial_geoip_latitude": 58.4167,
"$initial_geoip_longitude": 15.6167,
"$initial_geoip_postal_code": null,
"$initial_geoip_subdivision_1_code": "E",
"$initial_geoip_subdivision_1_name": "Östergötland County",
"$initial_geoip_subdivision_2_code": null,
"$initial_geoip_subdivision_2_name": null,
"$initial_geoip_time_zone": "Europe/Stockholm",
},
},
"timestamp": "2024-01-01T00:00:00Z",
"url": "https://us.posthog.com/projects/1/events/1234",
"uuid": "event-id",
}
`)
})

xit('should invoke the function', async () => {
it('should enrich person with IP location', async () => {
mockGlobals = tester.createGlobals({
event: {
properties: {
$ip: '89.160.20.129',
},
},
})

const response = await tester.invoke({}, mockGlobals)
// TODO: Add support for the hog function exector to do the geoip lookup stuff

expect(response.finished).toBe(true)
expect(response.error).toBeUndefined()
// TODO: Add the response to the hog executor
expect(response.execResult).toMatchInlineSnapshot()
// Check $set properties
expect((response.execResult as any).properties.$set).toEqual(
expect.objectContaining({
$geoip_city_name: 'Linköping',
$geoip_country_name: 'Sweden',
$geoip_country_code: 'SE',
$geoip_continent_name: 'Europe',
$geoip_continent_code: 'EU',
$geoip_latitude: 58.4167,
$geoip_longitude: 15.6167,
$geoip_time_zone: 'Europe/Stockholm',
$geoip_subdivision_1_code: 'E',
$geoip_subdivision_1_name: 'Östergötland County',
})
)

// Check $set_once properties
expect((response.execResult as any).properties.$set_once).toEqual(
expect.objectContaining({
$initial_geoip_city_name: 'Linköping',
$initial_geoip_country_name: 'Sweden',
$initial_geoip_country_code: 'SE',
$initial_geoip_continent_name: 'Europe',
$initial_geoip_continent_code: 'EU',
$initial_geoip_latitude: 58.4167,
$initial_geoip_longitude: 15.6167,
$initial_geoip_time_zone: 'Europe/Stockholm',
$initial_geoip_subdivision_1_code: 'E',
$initial_geoip_subdivision_1_name: 'Östergötland County',
})
)
})

it('should set properties to null if no values present', async () => {
// First call beforeEach with a transform function to remove city data
await tester.beforeEach((res) => {
const { city, ...remainingResult } = res
return remainingResult
})

// Then create the mock globals and run the test
mockGlobals = tester.createGlobals({
event: {
properties: {
$ip: '89.160.20.129',
},
},
})

const response = await tester.invoke({}, mockGlobals)

expect(response.finished).toBe(true)
expect(response.error).toBeUndefined()

expect((response.execResult as any).properties.$set).toMatchInlineSnapshot(`
{
"$geoip_accuracy_radius": 76,
"$geoip_city_confidence": null,
"$geoip_city_name": null,
"$geoip_continent_code": "EU",
"$geoip_continent_name": "Europe",
"$geoip_country_code": "SE",
"$geoip_country_name": "Sweden",
"$geoip_latitude": 58.4167,
"$geoip_longitude": 15.6167,
"$geoip_postal_code": null,
"$geoip_subdivision_1_code": "E",
"$geoip_subdivision_1_name": "Östergötland County",
"$geoip_subdivision_2_code": null,
"$geoip_subdivision_2_name": null,
"$geoip_time_zone": "Europe/Stockholm",
}
`)

expect((response.execResult as any).properties.$set_once).toMatchInlineSnapshot(`
{
"$initial_geoip_accuracy_radius": 76,
"$initial_geoip_city_confidence": null,
"$initial_geoip_city_name": null,
"$initial_geoip_continent_code": "EU",
"$initial_geoip_continent_name": "Europe",
"$initial_geoip_country_code": "SE",
"$initial_geoip_country_name": "Sweden",
"$initial_geoip_latitude": 58.4167,
"$initial_geoip_longitude": 15.6167,
"$initial_geoip_postal_code": null,
"$initial_geoip_subdivision_1_code": "E",
"$initial_geoip_subdivision_1_name": "Östergötland County",
"$initial_geoip_subdivision_2_code": null,
"$initial_geoip_subdivision_2_name": null,
"$initial_geoip_time_zone": "Europe/Stockholm",
}
`)
})

it('should skip processing when $geoip_disable is true', async () => {
mockGlobals = tester.createGlobals({
event: {
properties: {
$ip: '89.160.20.129',
$geoip_disable: true,
},
},
})

const response = await tester.invoke({}, mockGlobals)

expect(response.finished).toBe(true)
expect(response.error).toBeUndefined()
// Verify the event was not modified
expect((response.execResult as any).properties).toEqual({
$ip: '89.160.20.129',
$geoip_disable: true,
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,18 @@ if (event.properties?.$geoip_disable or empty(event.properties?.$ip)) {
}
let ip := event.properties.$ip
if (ip == '127.0.0.1') {
ip := '13.106.122.3' // Spoofing an Australian IP address for local development
print('spoofing ip for local development', ip)
ip := '89.160.20.129'
}
let response := geoipLookup(ip)
print(response)
if (not response) {
print('geoip lookup failed for ip', ip)
return event
}
let location := {}
if (response.city) {
location['city_name'] := response.city.names?.en
}
print(location)
if (response.country) {
location['country_name'] := response.country.names?.en
location['country_code'] := response.country.isoCode
Expand All @@ -69,6 +69,7 @@ if (response.subdivisions) {
location[f'subdivision_{index + 1}_name'] := subdivision.names?.en
}
}
print('geoip location data for ip:', location)
let returnEvent := event
returnEvent.properties := returnEvent.properties ?? {}
returnEvent.properties.$set := returnEvent.properties.$set ?? {}
Expand Down
Loading

0 comments on commit 24e9d94

Please sign in to comment.