-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(cdp): add brevo destination (#26195)
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
- Loading branch information
1 parent
e92e4a8
commit b562836
Showing
4 changed files
with
129 additions
and
0 deletions.
There are no files selected for viewing
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
from posthog.cdp.templates.hog_function_template import HogFunctionTemplate | ||
|
||
template: HogFunctionTemplate = HogFunctionTemplate( | ||
status="beta", | ||
type="destination", | ||
id="template-brevo", | ||
name="Brevo", | ||
description="Update contacts in Brevo", | ||
icon_url="/static/services/brevo.png", | ||
category=["Email Marketing"], | ||
hog=""" | ||
if (empty(inputs.email)) { | ||
print('No email set. Skipping...') | ||
return | ||
} | ||
let body := { | ||
'email': inputs.email, | ||
'updateEnabled': true, | ||
'attributes': {} | ||
} | ||
for (let key, value in inputs.attributes) { | ||
if (not empty(value)) { | ||
body.attributes[key] := value | ||
} | ||
} | ||
let res := fetch(f'https://api.brevo.com/v3/contacts', { | ||
'method': 'POST', | ||
'headers': { | ||
'api-key': inputs.apiKey, | ||
'Content-Type': 'application/json', | ||
}, | ||
'body': body | ||
}) | ||
if (res.status >= 400) { | ||
throw Error(f'Error from api.brevo.com (status {res.status}): {res.body}') | ||
} | ||
""".strip(), | ||
inputs_schema=[ | ||
{ | ||
"key": "apiKey", | ||
"type": "string", | ||
"label": "Brevo API Key", | ||
"description": "Check out this page on how to get your API key: https://help.brevo.com/hc/en-us/articles/209467485-Create-and-manage-your-API-keys", | ||
"secret": True, | ||
"required": True, | ||
}, | ||
{ | ||
"key": "email", | ||
"type": "string", | ||
"label": "Email of the user", | ||
"description": "Where to find the email for the contact to be created. You can use the filters section to filter out unwanted emails or internal users.", | ||
"default": "{person.properties.email}", | ||
"secret": False, | ||
"required": True, | ||
}, | ||
{ | ||
"key": "attributes", | ||
"type": "dictionary", | ||
"label": "Attributes", | ||
"description": "For information on potential attributes, refer to the following page: https://help.brevo.com/hc/en-us/articles/10617359589906-Create-and-manage-contact-attributes", | ||
"default": { | ||
"EMAIL": "{person.properties.email}", | ||
"FIRSTNAME": "{person.properties.firstname}", | ||
"LASTNAME": "{person.properties.lastname}", | ||
}, | ||
"secret": False, | ||
"required": True, | ||
}, | ||
], | ||
filters={ | ||
"events": [ | ||
{"id": "$identify", "name": "$identify", "type": "events", "order": 0}, | ||
{"id": "$set", "name": "$set", "type": "events", "order": 0}, | ||
], | ||
"actions": [], | ||
"filter_test_accounts": True, | ||
}, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
from inline_snapshot import snapshot | ||
from posthog.cdp.templates.helpers import BaseHogFunctionTemplateTest | ||
from posthog.cdp.templates.brevo.template_brevo import ( | ||
template as template_brevo, | ||
) | ||
|
||
|
||
def create_inputs(**kwargs): | ||
inputs = { | ||
"apiKey": "apikey12345", | ||
"email": "[email protected]", | ||
"attributes": {"EMAIL": "[email protected]", "FIRSTNAME": "Max"}, | ||
} | ||
inputs.update(kwargs) | ||
|
||
return inputs | ||
|
||
|
||
class TestTemplateBrevo(BaseHogFunctionTemplateTest): | ||
template = template_brevo | ||
|
||
def test_function_works(self): | ||
self.run_function(inputs=create_inputs()) | ||
assert self.get_mock_fetch_calls()[0] == snapshot( | ||
( | ||
"https://api.brevo.com/v3/contacts", | ||
{ | ||
"method": "POST", | ||
"headers": { | ||
"api-key": "apikey12345", | ||
"Content-Type": "application/json", | ||
}, | ||
"body": { | ||
"email": "[email protected]", | ||
"updateEnabled": True, | ||
"attributes": {"EMAIL": "[email protected]", "FIRSTNAME": "Max"}, | ||
}, | ||
}, | ||
) | ||
) | ||
|
||
def test_function_requires_identifier(self): | ||
self.run_function(inputs=create_inputs(email="")) | ||
|
||
assert not self.get_mock_fetch_calls() | ||
assert self.get_mock_print_calls() == snapshot([("No email set. Skipping...",)]) |