Skip to content

Commit

Permalink
HGI-6307: Add lists and tags behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
hsyyid committed Aug 14, 2024
1 parent 1375dbc commit 93a16c8
Showing 1 changed file with 77 additions and 4 deletions.
81 changes: 77 additions & 4 deletions target_salesforce_v3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class ContactsSink(SalesforceV3Sink):
unified_schema = Contact
name = Contact.Stream.name
campaigns = None
topics = None
contact_type = "Contact"
available_names = ["contacts", "customers"]

Expand Down Expand Up @@ -124,8 +125,15 @@ def preprocess_record(self, record: dict, context: dict):
mapping.update({"Id":id})
lookup_field = f"Id = '{id}'"

# We map tags => topics in Salesforce
if record.get('tags'):
self.topics = record['tags']

# We map campaigns => campaigns in Salesforce
if record.get('campaigns'):
self.campaigns = record['campaigns']

# We map lists => campaigns in Salesforce
if not self.campaigns and record.get("lists"):
self.campaigns = [{"name": list_item} for list_item in record.get("lists")]

Expand Down Expand Up @@ -226,6 +234,10 @@ def upsert_record(self, record, context):
# Check for campaigns to be added
if self.campaigns:
self.assign_to_campaign(id,self.campaigns)

if self.topics:
self.assign_to_topic(id,self.topics)

return id, True, state_updates
except Exception as e:
self.logger.exception(f"Could not PATCH to {url}: {e}")
Expand All @@ -237,6 +249,9 @@ def upsert_record(self, record, context):
# Check for campaigns to be added
if self.campaigns:
self.assign_to_campaign(id,self.campaigns)

if self.topics:
self.assign_to_topic(id,self.topics)
return id, True, state_updates
except Exception as e:
self.logger.exception("Error while attempting to create Contact")
Expand All @@ -262,6 +277,53 @@ def validate_response(self, response):
msg = self.response_error_message(response)
raise FatalAPIError(msg)

def assign_to_topic(self,contact_id,topics:list) -> None:
"""
This function recieves a contact_id and a list of topics and assigns the contact_id to each topic
Input:
contact_id : str
topics : list[str]
"""

for topic in topics:
# data = self.get_query(endpoint=f"sobjects/Campaign/Name/{campaign.get('name')}")
data = self.query_sobject(
query = f"SELECT Id, CreatedDate from Topic WHERE Name = '{topic}' ORDER BY CreatedDate ASC",
fields = ['Id']
)
# Extract topic id from record
if not data:
self.logger.info(f"No topic found with Name = '{topic}'\nCreating topic ...")
# Create the topic since it doesn't exist
response = self.request_api("POST", endpoint="sobjects/Topic", request_data={
"Name": topic,
})
id = response.json().get("id")
self.logger.info(f"{self.name} created with id: {id}")
topic_id = id
else:
topic_id = data[0]['Id']

# Add the contact to the topic
self.logger.info(f"INFO: Adding Contact/Lead Id:[{contact_id}] to Topic Id:[{topic_id}].")

try:
response = self.request_api("POST",endpoint="sobjects/TopicAssignment",request_data={
"EntityId": contact_id,
"TopicId": topic_id
})

data = response.json()
self.logger.info(f"Added TopicAssignment")
except Exception as e:
# Means it's already in the topic
if "DUPLICATE_VALUE" in str(e):
return

self.logger.exception("Error encountered while creating TopicAssignment")
raise e

def assign_to_campaign(self,contact_id,campaigns:list) -> None:
"""
This function recieves a contact_id and a list of campaigns and assigns the contact_id to each campaign
Expand All @@ -283,9 +345,16 @@ def assign_to_campaign(self,contact_id,campaigns:list) -> None:
)
# Extract capaign id from record
if not data:
self.logger.info(f"No Campaign found with Name = '{campaign.get('name')}'\nSkipping campaign ...")
continue
campaign['campaign_id'] = data[0]['Id']
self.logger.info(f"No Campaign found with Name = '{campaign.get('name')}'\nCreating campaign ...")
# Create the campaign since it doesn't exist
response = self.request_api("POST", endpoint="sobjects/Campaign", request_data={
"Name": campaign.get("name"),
})
id = response.json().get("id")
self.logger.info(f"{self.name} created with id: {id}")
campaign['campaign_id'] = id
else:
campaign['campaign_id'] = data[0]['Id']

# Assigns the customer_id to the campaign_id or lead_id
mapping = {"CampaignId": campaign.get("campaign_id") or campaign.get("id")}
Expand All @@ -299,8 +368,12 @@ def assign_to_campaign(self,contact_id,campaigns:list) -> None:

try:
response = self.request_api("POST",endpoint="sobjects/CampaignMember",request_data=mapping)
data = response.json()

id = response.json().get("id")
if isinstance(data, list) and data[0].get("message") == "Already a campaign member.":
return

id = data.get("id")
self.logger.info(f"CampaignMember created with id: {id}")
except Exception as e:
self.logger.exception("Error encountered while creating CampaignMember")
Expand Down

0 comments on commit 93a16c8

Please sign in to comment.