Skip to content

Commit

Permalink
Removes fields_validation from the custom objects sink and adds new r…
Browse files Browse the repository at this point in the history
…etries on posting objects with external ID

Co-authored-by: Hassan Syyid <[email protected]>
  • Loading branch information
vmesel and Hassan Syyid committed Nov 9, 2023
1 parent bbef191 commit 3e2d8f2
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 36 deletions.
1 change: 1 addition & 0 deletions target_salesforce_v3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ def sf_fields_description(self, object_type=None):
if not f["nillable"] and f["createable"] and not f["defaultedOnCreate"]
]
fields["external_ids"] = [f["name"] for f in fld if f["externalId"]]
fields["idLookup"] = [f["name"] for f in fld if f["idLookup"] == True]
fields["pickable"] = {}
for field in fld:
if field["picklistValues"]:
Expand Down
73 changes: 37 additions & 36 deletions target_salesforce_v3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,12 @@ def reference_contacts(self):
return [{k: v for k, v in r.items() if k in ["Id", "Name"]} for r in response]

def preprocess_record(self, record, context):

installment_period = record.get("installment_period").title()
installment_period = self.get_pickable(
installment_period, "npe03__Installment_Period__c"
)
installment_period = None
if record.get("installment_period"):
installment_period = record.get("installment_period").title()
installment_period = self.get_pickable(
installment_period, "npe03__Installment_Period__c"
)

fields_dict = self.sf_fields_description()
if record.get("created_at"):
Expand Down Expand Up @@ -710,25 +711,6 @@ def get_fields_for_object(self, object_type):

raise MissingObjectInSalesforceError(f"Object type {object_type} not found in Salesforce.")

def validate_record(self, record, fields):
new_record = {}
for original_field, value in record.items():
if original_field not in fields:
self.logger.warning(
f"Field {original_field} not found in Salesforce. Will not be synced."
)
continue

if fields[original_field]["nillable"] == False and value is None:
self.logger.warning(
f"Field {original_field} is not nullable. Will not be synced."
)
continue

new_record[original_field] = value

return new_record

def preprocess_record(self, record, context):
# Check if object exists in Salesforce
object_type = None
Expand All @@ -751,7 +733,6 @@ def preprocess_record(self, record, context):
except MissingObjectInSalesforceError:
self.logger.info("Skipping record, because it was not found on Salesforce.")
return
record = self.validate_record(record, fields)
record["object_type"] = object_type
return record

Expand All @@ -763,12 +744,17 @@ def process_record(self, record, context):
self.logger.info(f"Processing record for type {self.stream_name} failed. Check logs.")
return

required_fields = []
fields_desc = self.sf_fields_description(object_type=object_type)

possible_update_fields = []

for field in fields_desc["external_ids"]:
if field in record:
possible_update_fields.append(field)

if record.get("Id"):
fields = ["Id"]
else:
fields_desc = self.sf_fields_description(object_type=object_type)
required_fields = fields_desc["required"]
list_fields = [field_list for field_list in fields_desc.values()]
fields = []
for list_field in list_fields:
Expand All @@ -794,16 +780,31 @@ def process_record(self, record, context):
response = self.request_api("PATCH", endpoint=url, request_data=record)
id = response.json().get("id")
self.logger.info(f"{object_type} updated with id: {id}")
return
except Exception as e:
self.logger.exception(f"Error encountered while updating {object_type}")
raise e

else:
try:
response = self.request_api("POST", endpoint=endpoint, request_data=record)
id = response.json().get("id")
self.logger.info(f"{object_type} created with id: {id}")
except Exception as e:
self.logger.exception(f"Error encountered while creating {object_type}")
raise e
if len(possible_update_fields) > 0:
for id_field in possible_update_fields:
try:
url = "/".join([endpoint, id_field, record.pop(id_field)])
response = self.request_api("PATCH", endpoint=url, request_data=record)
id = response.json().get("id")
self.logger.info(f"{object_type} updated with id: {id}")
return
except Exception as e:
self.logger.exception(f"Could not PATCH to {url}: {e}")

try:
if len(possible_update_fields) > 0:
self.logger.info("Failed to find updatable entity, trying to create it.")

response = self.request_api("POST", endpoint=endpoint, request_data=record)
id = response.json().get("id")
self.logger.info(f"{object_type} created with id: {id}")
return
except Exception as e:
self.logger.exception(f"Error encountered while creating {object_type}")
raise e

0 comments on commit 3e2d8f2

Please sign in to comment.