Skip to content

Commit

Permalink
Bump lookup_fields for unified sinks, support list
Browse files Browse the repository at this point in the history
  • Loading branch information
keyn4 committed Jul 26, 2024
1 parent 1375dbc commit 7fd15f0
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 56 deletions.
29 changes: 24 additions & 5 deletions target_salesforce_v3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ def http_headers(self) -> dict:
if "user_agent" in self.config:
headers["User-Agent"] = self.config.get("user_agent")
return headers

@property
def lookup_fields_dict(self):
return self.config.get("lookup_fields") or {}

def get_fields_for_object(self, object_type):
"""Check if Salesforce has an object type and fetches its fields."""
Expand Down Expand Up @@ -427,11 +431,26 @@ def update_field_permissions(self,permission_set_id, sobject_type, field_name):
self.logger.info(f"Field permission for {field_name} updated for permission set {permission_set_id}, response: {response.text}")


def map_only_empty_fields(self, mapping, sobject_name, lookup_field):
fields = ",".join([field for field in mapping.keys()])
def map_only_empty_fields(self, mapping, sobject_name, lookup_filter):
fields = [field for field in mapping.keys()]
if "Id" not in fields:
fields.append("Id")
fields = ",".join(fields)
data = self.query_sobject(
query = f"SELECT {fields} from {sobject_name} WHERE {lookup_field}",
query = f"SELECT {fields} from {sobject_name} WHERE {lookup_filter}",
)
if data:
mapping = {k:v for k,v in mapping.items() if not data[0].get(k) or k == "Id"}
return mapping
existing_record = data[0]
mapping.update({"Id": existing_record["Id"]})
mapping = {k:v for k,v in mapping.items() if not existing_record.get(k) or k == "Id"}
return mapping


def get_lookup_filter(self, lookup_values, method):
conditions = []
if lookup_values:
query_operator = "AND" if method == "all" else "OR"
for field, value in lookup_values.items():
conditions.append(f"{field} = '{value}'")

return f" {query_operator} ".join(conditions)
178 changes: 127 additions & 51 deletions target_salesforce_v3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def reference_data(self):
return [{k: v for k, v in r.items() if k in ["Id", "Name"]} for r in response]

def preprocess_record(self, record: dict, context: dict):

# 1. Map and process record
# Parse data
if isinstance(record.get("addresses"), str):
record["addresses"] = json.loads(record["addresses"])
Expand Down Expand Up @@ -100,30 +100,6 @@ def preprocess_record(self, record: dict, context: dict):
elif self.contact_type == "Lead":
mapping.update({"Company": record.get("company_name")})


# check if record already exists
lookup_field = None
if record.get('id'):
# If contact has an Id will use it to updatev
mapping.update({"Id": record['id']})
lookup_field = f"Id = '{record['id']}'"
elif record.get("external_id"):
external_id = record["external_id"]
mapping[external_id["name"]] = external_id["value"]
lookup_field = f"{external_id['name']} = '{external_id['value']}'"
else:
# If no Id we'll use email to search for an existing record
if record.get('email'):
# Get contact_id based on email
data = self.query_sobject(
query = f"SELECT Name, Id from {self.contact_type} WHERE Email = '{record.get('email')}'",
fields = ['Name', 'Id']
)
if data:
id = data[0].get("Id")
mapping.update({"Id":id})
lookup_field = f"Id = '{id}'"

if record.get('campaigns'):
self.campaigns = record['campaigns']
if not self.campaigns and record.get("lists"):
Expand Down Expand Up @@ -189,6 +165,40 @@ def preprocess_record(self, record: dict, context: dict):

# validate mapping
mapping = self.validate_output(mapping)

# 2. Check if record exist based on default lookup_fields or lookup_fields set in config
lookup_field = None
lookup_method = self.config.get("lookup_method", "sequential")
# check if record already exists
if record.get('id'):
# If contact has an Id will use it to updatev
mapping.update({"Id": record['id']})
lookup_field = f"Id = '{record['id']}'"
# if lookup_fields set for unified sinks check those first
elif self.lookup_fields_dict.get(self.contact_type):
lookup_values = {
field: mapping.get(field)
for field in self.lookup_fields_dict.get(self.contact_type)
if field in mapping
}
if lookup_values:
lookup_field = self.get_lookup_filter(lookup_values, lookup_method)
elif record.get("external_id"):
external_id = record["external_id"]
mapping[external_id["name"]] = external_id["value"]
lookup_field = f"{external_id['name']} = '{external_id['value']}'"
else:
# If no Id we'll use email to search for an existing record
if record.get('email'):
# Get contact_id based on email
data = self.query_sobject(
query = f"SELECT Name, Id from {self.contact_type} WHERE Email = '{record.get('email')}'",
fields = ['Name', 'Id']
)
if data:
id = data[0].get("Id")
mapping.update({"Id":id})
lookup_field = f"Id = '{id}'"

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and lookup_field:
Expand Down Expand Up @@ -322,6 +332,7 @@ def reference_data(self):
return [{k: v for k, v in r.items() if k in ["Id", "Name"]} for r in response]

def preprocess_record(self, record, context):
#1. Map and process record
if isinstance(record.get("custom_fields"), str):
record["custom_fields"] = json.loads(record["custom_fields"])

Expand Down Expand Up @@ -381,8 +392,18 @@ def preprocess_record(self, record, context):
cf['name'] += '__c'
mapping.update({cf['name']:cf['value']})

# 2. get record using lookup_fields
lookup_field = None
if record.get("external_id"):
lookup_method = self.config.get("lookup_method", "sequential")
if self.lookup_fields_dict.get(self.name):
lookup_values = {
field: mapping.get(field)
for field in self.lookup_fields_dict.get(self.name)
if field in mapping
}
if lookup_values:
lookup_field = self.get_lookup_filter(lookup_values, lookup_method)
elif record.get("external_id"):
external_id = record["external_id"]
mapping[external_id["name"]] = external_id["value"]
lookup_field = f'{external_id["name"]} = {external_id["value"]}'
Expand Down Expand Up @@ -551,7 +572,7 @@ class CampaignSink(SalesforceV3Sink):
available_names = ["campaigns"]

def preprocess_record(self, record, context):

# 1. Process record
record = self.validate_input(record)

# fields = self.sf_fields_description
Expand Down Expand Up @@ -588,11 +609,23 @@ def preprocess_record(self, record, context):

mapping = self.validate_output(mapping)

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and mapping.get("Id"):
# 2. Get record using lookup_fields
lookup_field = None
lookup_method = self.config.get("lookup_method", "sequential")
if self.lookup_fields_dict.get(self.name):
lookup_values = {
field: mapping.get(field)
for field in self.lookup_fields_dict.get(self.name)
if field in mapping
}
if lookup_values:
lookup_field = self.get_lookup_filter(lookup_values, lookup_method)
elif mapping.get("Id"):
lookup_field = f"Id = {mapping['Id']}"
mapping = self.map_only_empty_fields(mapping, "Campaign", lookup_field)

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and lookup_field:
mapping = self.map_only_empty_fields(mapping, "Campaign", lookup_field)
return mapping


Expand Down Expand Up @@ -646,7 +679,7 @@ class CampaignMemberSink(SalesforceV3Sink):
available_names = ["campaignmembers"]

def preprocess_record(self, record, context) -> dict:

# 1. Map and process record
mapping = {
"CampaignId": record.get("campaign_id"),
# "Description": record.get("description"),
Expand Down Expand Up @@ -683,9 +716,22 @@ def preprocess_record(self, record, context) -> dict:

mapping = self.validate_output(mapping)

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and mapping.get("Id"):
# 2. Get record using lookup_fields
lookup_field = None
lookup_method = self.config.get("lookup_method", "sequential")
if self.lookup_fields_dict.get(self.name):
lookup_values = {
field: mapping.get(field)
for field in self.lookup_fields_dict.get(self.name)
if field in mapping
}
if lookup_values:
lookup_field = self.get_lookup_filter(lookup_values, lookup_method)
elif mapping.get("Id"):
lookup_field = f"Id = {mapping['Id']}"

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and lookup_field:
mapping = self.map_only_empty_fields(mapping, "CampaignMember", lookup_field)

return mapping
Expand All @@ -708,7 +754,7 @@ class ActivitiesSink(SalesforceV3Sink):
available_names = ["activities"]

def preprocess_record(self, record, context):

# 1. Map and process record
record = self.validate_input(record)

# fields = self.sf_fields_description
Expand Down Expand Up @@ -741,9 +787,22 @@ def preprocess_record(self, record, context):

mapping = self.validate_output(mapping)

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and mapping.get("Id"):
# 2. Get record using lookup_fields
lookup_field = None
lookup_method = self.config.get("lookup_method", "sequential")
if self.lookup_fields_dict.get(self.name):
lookup_values = {
field: mapping.get(field)
for field in self.lookup_fields_dict.get(self.name)
if field in mapping
}
if lookup_values:
lookup_field = self.get_lookup_filter(lookup_values, lookup_method)
elif mapping.get("Id"):
lookup_field = f"Id = {mapping['Id']}"

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and lookup_field:
mapping = self.map_only_empty_fields(mapping, "Task", lookup_field)

return mapping
Expand All @@ -752,13 +811,25 @@ def preprocess_record(self, record, context):
class FallbackSink(SalesforceV3Sink):
endpoint = "sobjects/"

@property
def lookup_fields_dict(self):
return self.config.get("lookup_fields") or {}

@property
def name(self):
return self.stream_name

def get_record(self, lookup_values, object_type, fields, record, method):
# get select fields for query
query_fields = [field for field in fields.keys() if field in record]
# add Id field to query_fields if it's not there
if "Id" not in query_fields:
query_fields.append("Id")
query_fields = ",".join(query_fields)

# get filter for query
query_filter = self.get_lookup_filter(lookup_values, method)
# execute query
query = f"SELECT {query_fields} FROM {object_type} WHERE {query_filter}"
req = self.request_api("GET", "queryAll", params={"q": query})
req = req.json().get("records")
return req

def get_fields_for_object(self, object_type):
"""Check if Salesforce has an object type and fetches its fields."""
Expand Down Expand Up @@ -796,20 +867,25 @@ def preprocess_record(self, record, context):
record["object_type"] = object_type

# If lookup_fields dict exist in config use it to check if the record exists in Salesforce
object_lookup_field = self.lookup_fields_dict.get(object_type)
# check if the lookup field exists for the object
object_lookup_field = object_lookup_field if object_lookup_field in fields else None
# check if the record has a value for the lookup field
lookup_value = record.get(object_lookup_field)
_lookup_fields = self.lookup_fields_dict.get(object_type)
lookup_method = self.config.get("lookup_method", "sequential")
# Standarize lookup fields as a list of strings
if isinstance(_lookup_fields, str):
_lookup_fields = [_lookup_fields]
# check if the lookup fields exist for the object
_lookup_fields = [field for field in _lookup_fields if field in fields]
# get all the values for all lookup fields
lookup_values = {field: record.get(field) for field in _lookup_fields}

req = None
# lookup for record with field from config
if object_lookup_field and lookup_value:
query_fields = ",".join([field for field in fields.keys() if field in record] + ["Id"])
query = f"SELECT {query_fields} FROM {object_type} WHERE {object_lookup_field} = '{lookup_value}'"
req = self.request_api("GET", "queryAll", params={"q": query})
req = req.json().get("records")
# lookup for record with email fields
if lookup_values:
if lookup_method == "all" and len(lookup_values) != len(_lookup_fields):
self.logger.info(f"Skipping lookup for record {record} because lookup_method is set to 'all' but not all fields have valid values")
else:
req = self.get_record(lookup_values, object_type, fields, record, lookup_method)

# If no lookup fields set lookup by email fields
else:
# Try to find object instance using email
email_fields = ["Email", "npe01__AlternateEmail__c", "npe01__HomeEmail__c", "npe01__Preferred_Email__c", "npe01__WorkEmail__c"]
Expand Down

0 comments on commit 7fd15f0

Please sign in to comment.