From 3e2d8f28cf908abadc714c6ebf92e5ab9a19c5c0 Mon Sep 17 00:00:00 2001 From: Vinicius Mesel <4984147+vmesel@users.noreply.github.com> Date: Thu, 9 Nov 2023 17:03:45 -0300 Subject: [PATCH] Removes fields_validation from the custom objects sink and adds new retries on posting objects with external ID Co-authored-by: Hassan Syyid --- target_salesforce_v3/client.py | 1 + target_salesforce_v3/sinks.py | 73 +++++++++++++++++----------------- 2 files changed, 38 insertions(+), 36 deletions(-) diff --git a/target_salesforce_v3/client.py b/target_salesforce_v3/client.py index 57e8732..6400fc6 100644 --- a/target_salesforce_v3/client.py +++ b/target_salesforce_v3/client.py @@ -225,6 +225,7 @@ def sf_fields_description(self, object_type=None): if not f["nillable"] and f["createable"] and not f["defaultedOnCreate"] ] fields["external_ids"] = [f["name"] for f in fld if f["externalId"]] + fields["idLookup"] = [f["name"] for f in fld if f["idLookup"] == True] fields["pickable"] = {} for field in fld: if field["picklistValues"]: diff --git a/target_salesforce_v3/sinks.py b/target_salesforce_v3/sinks.py index 2821c18..8c80185 100644 --- a/target_salesforce_v3/sinks.py +++ b/target_salesforce_v3/sinks.py @@ -460,11 +460,12 @@ def reference_contacts(self): return [{k: v for k, v in r.items() if k in ["Id", "Name"]} for r in response] def preprocess_record(self, record, context): - - installment_period = record.get("installment_period").title() - installment_period = self.get_pickable( - installment_period, "npe03__Installment_Period__c" - ) + installment_period = None + if record.get("installment_period"): + installment_period = record.get("installment_period").title() + installment_period = self.get_pickable( + installment_period, "npe03__Installment_Period__c" + ) fields_dict = self.sf_fields_description() if record.get("created_at"): @@ -710,25 +711,6 @@ def get_fields_for_object(self, object_type): raise MissingObjectInSalesforceError(f"Object type {object_type} not found in Salesforce.") - def validate_record(self, record, fields): - new_record = {} - for original_field, value in record.items(): - if original_field not in fields: - self.logger.warning( - f"Field {original_field} not found in Salesforce. Will not be synced." - ) - continue - - if fields[original_field]["nillable"] == False and value is None: - self.logger.warning( - f"Field {original_field} is not nullable. Will not be synced." - ) - continue - - new_record[original_field] = value - - return new_record - def preprocess_record(self, record, context): # Check if object exists in Salesforce object_type = None @@ -751,7 +733,6 @@ def preprocess_record(self, record, context): except MissingObjectInSalesforceError: self.logger.info("Skipping record, because it was not found on Salesforce.") return - record = self.validate_record(record, fields) record["object_type"] = object_type return record @@ -763,12 +744,17 @@ def process_record(self, record, context): self.logger.info(f"Processing record for type {self.stream_name} failed. Check logs.") return - required_fields = [] + fields_desc = self.sf_fields_description(object_type=object_type) + + possible_update_fields = [] + + for field in fields_desc["external_ids"]: + if field in record: + possible_update_fields.append(field) + if record.get("Id"): fields = ["Id"] else: - fields_desc = self.sf_fields_description(object_type=object_type) - required_fields = fields_desc["required"] list_fields = [field_list for field_list in fields_desc.values()] fields = [] for list_field in list_fields: @@ -794,16 +780,31 @@ def process_record(self, record, context): response = self.request_api("PATCH", endpoint=url, request_data=record) id = response.json().get("id") self.logger.info(f"{object_type} updated with id: {id}") + return except Exception as e: self.logger.exception(f"Error encountered while updating {object_type}") raise e - else: - try: - response = self.request_api("POST", endpoint=endpoint, request_data=record) - id = response.json().get("id") - self.logger.info(f"{object_type} created with id: {id}") - except Exception as e: - self.logger.exception(f"Error encountered while creating {object_type}") - raise e + if len(possible_update_fields) > 0: + for id_field in possible_update_fields: + try: + url = "/".join([endpoint, id_field, record.pop(id_field)]) + response = self.request_api("PATCH", endpoint=url, request_data=record) + id = response.json().get("id") + self.logger.info(f"{object_type} updated with id: {id}") + return + except Exception as e: + self.logger.exception(f"Could not PATCH to {url}: {e}") + + try: + if len(possible_update_fields) > 0: + self.logger.info("Failed to find updatable entity, trying to create it.") + + response = self.request_api("POST", endpoint=endpoint, request_data=record) + id = response.json().get("id") + self.logger.info(f"{object_type} created with id: {id}") + return + except Exception as e: + self.logger.exception(f"Error encountered while creating {object_type}") + raise e