Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support to send empty fields for unified sinks #20

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions target_salesforce_v3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ def upsert_record(self, record: dict, context: dict) -> None:
# Getting custom fields from record
# self.process_custom_fields(record)

if not record:
return "", True, {"state": "no fields to post or update"}

fields = self.sf_fields_description()

for field in fields["external_ids"]:
Expand Down Expand Up @@ -301,10 +304,12 @@ def validate_output(self, mapping):
# raise MissingRequiredFieldException(req_field)
return payload

def query_sobject(self, query, fields):
def query_sobject(self, query, fields=None):
params = {"q": query}
response = self.request_api("GET", endpoint="query", params=params)
response = response.json()["records"]
if not fields:
return response
return [{k: v for k, v in r.items() if k in fields} for r in response]

def process_custom_fields(self, record) -> None:
Expand Down Expand Up @@ -419,4 +424,14 @@ def update_field_permissions(self,permission_set_id, sobject_type, field_name):
}

response = self.request_api("POST", endpoint="composite", request_data=payload, headers={"Content-Type": "application/json"})
self.logger.info(f"Field permission for {field_name} updated for permission set {permission_set_id}, response: {response.text}")
self.logger.info(f"Field permission for {field_name} updated for permission set {permission_set_id}, response: {response.text}")


def map_only_empty_fields(self, mapping, sobject_name, lookup_field):
fields = ",".join([field for field in mapping.keys()])
data = self.query_sobject(
query = f"SELECT {fields} from {sobject_name} WHERE {lookup_field}",
)
if data:
mapping = {k:v for k,v in mapping.items() if not data[0].get(k) or k == "Id"}
return mapping
58 changes: 52 additions & 6 deletions target_salesforce_v3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def reference_data(self):

def preprocess_record(self, record: dict, context: dict):

# Parse data
if isinstance(record.get("addresses"), str):
record["addresses"] = json.loads(record["addresses"])

Expand Down Expand Up @@ -67,6 +68,7 @@ def preprocess_record(self, record: dict, context: dict):

# fields = self.sf_fields_description

# map data
mapping = {
"FirstName": record.get("first_name"),
"LastName": record.get("last_name"),
Expand Down Expand Up @@ -98,12 +100,17 @@ def preprocess_record(self, record: dict, context: dict):
elif self.contact_type == "Lead":
mapping.update({"Company": record.get("company_name")})


# check if record already exists
lookup_field = None
if record.get('id'):
# If contact has an Id will use it to updatev
mapping.update({"Id": record['id']})
lookup_field = f"Id = '{record['id']}'"
elif record.get("external_id"):
external_id = record["external_id"]
mapping[external_id["name"]] = external_id["value"]
lookup_field = f"{external_id['name']} = '{external_id['value']}'"
else:
# If no Id we'll use email to search for an existing record
if record.get('email'):
Expand All @@ -113,7 +120,9 @@ def preprocess_record(self, record: dict, context: dict):
fields = ['Name', 'Id']
)
if data:
mapping.update({"Id":data[0].get("Id")})
id = data[0].get("Id")
mapping.update({"Id":id})
lookup_field = f"Id = '{id}'"

if record.get('campaigns'):
self.campaigns = record['campaigns']
Expand Down Expand Up @@ -178,7 +187,14 @@ def preprocess_record(self, record: dict, context: dict):
)
mapping["AccountId"] = next(account_id, None)

return self.validate_output(mapping)
# validate mapping
mapping = self.validate_output(mapping)

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and lookup_field:
mapping = self.map_only_empty_fields(mapping, self.contact_type, lookup_field)

return mapping

def upsert_record(self, record, context):
"""Process the record."""
Expand Down Expand Up @@ -367,11 +383,19 @@ def preprocess_record(self, record, context):
cf['name'] += '__c'
mapping.update({cf['name']:cf['value']})

lookup_field = None
if record.get("external_id"):
external_id = record["external_id"]
mapping[external_id["name"]] = external_id["value"]
lookup_field = f'{external_id["name"]} = {external_id["value"]}'

return self.validate_output(mapping)
mapping = self.validate_output(mapping)

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and lookup_field:
mapping = self.map_only_empty_fields(mapping, "Opportunity", lookup_field)

return mapping


class CompanySink(SalesforceV3Sink):
Expand Down Expand Up @@ -564,7 +588,15 @@ def preprocess_record(self, record, context):
cf['name'] += '__c'
mapping.update({cf['name']:cf['value']})

return self.validate_output(mapping)
mapping = self.validate_output(mapping)

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and mapping.get("Id"):
lookup_field = f"Id = {mapping['Id']}"
mapping = self.map_only_empty_fields(mapping, "Campaign", lookup_field)

return mapping


def upsert_record(self, record, context):
"""Process the record."""
Expand Down Expand Up @@ -651,7 +683,14 @@ def preprocess_record(self, record, context) -> dict:
cf['name'] += '__c'
mapping.update({cf['name']:cf['value']})

return self.validate_output(mapping)
mapping = self.validate_output(mapping)

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and mapping.get("Id"):
lookup_field = f"Id = {mapping['Id']}"
mapping = self.map_only_empty_fields(mapping, "CampaignMember", lookup_field)

return mapping

def get_campaign_member_id(self, contact_id, campaign_id, contact_lookup = 'ContactId'):

Expand Down Expand Up @@ -702,7 +741,14 @@ def preprocess_record(self, record, context):
cf['name'] += '__c'
mapping.update({cf['name']:cf['value']})

return self.validate_output(mapping)
mapping = self.validate_output(mapping)

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and mapping.get("Id"):
lookup_field = f"Id = {mapping['Id']}"
mapping = self.map_only_empty_fields(mapping, "Task", lookup_field)

return mapping


class FallbackSink(SalesforceV3Sink):
Expand Down
Loading