Skip to content

Commit

Permalink
add support to send attachments (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
keyn4 authored Dec 5, 2024
1 parent 1c221d2 commit 7cb6ff1
Showing 1 changed file with 43 additions and 1 deletion.
44 changes: 43 additions & 1 deletion target_salesforce_v3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,8 @@ def lookup_fields_dict(self):
@property
def name(self):
return self.stream_name

not_searchable_by_mail = ["ContentVersion"]

def get_fields_for_object(self, object_type):
"""Check if Salesforce has an object type and fetches its fields."""
Expand Down Expand Up @@ -883,6 +885,10 @@ def preprocess_record(self, record, context):
self.logger.info("Skipping record, because it was not found on Salesforce.")
return {}

# add field to link attachments
if self.name == "ContentVersion":
fields.update({"LinkedEntityId": {"createable": True}})

# keep only available fields and that are creatable or updatable
# NOTE: we need to keep relations (__r, xId)
record = {k:v for k,v in record.items() if k.endswith("__r") or fields.get(k+"Id") or (fields.get(k) and (fields[k]["createable"] or fields[k]["updateable"] or k.lower() in ["id", "externalid"]))}
Expand All @@ -907,7 +913,7 @@ def preprocess_record(self, record, context):
req = self.request_api("GET", "queryAll", params={"q": query})
req = req.json().get("records")
# lookup for record with email fields
elif self.config.get("lookup_by_email", True):
elif self.config.get("lookup_by_email", True) and self.name not in self.not_searchable_by_mail:
# Try to find object instance using email
email_fields = ["Email", "npe01__AlternateEmail__c", "npe01__HomeEmail__c", "npe01__Preferred_Email__c", "npe01__WorkEmail__c"]
email_values = [record.get(email_field) for email_field in email_fields if record.get(email_field)]
Expand Down Expand Up @@ -954,7 +960,11 @@ def upsert_record(self, record, context):
if record == {}:
self.logger.info(f"Processing record for type {self.stream_name} failed. Check logs.")
return

# for files pop object id to link the file
linked_object_id = record.pop("LinkedEntityId", None) if self.name == "ContentVersion" else None

# get object fields
fields_desc = self.sf_fields_description(object_type=object_type)

possible_update_fields = []
Expand Down Expand Up @@ -999,6 +1009,7 @@ def upsert_record(self, record, context):
return object_id, True, state_updates

id = response.json().get("id")
self.link_attachment_to_object(id, linked_object_id)
self.logger.info(f"{object_type} updated with id: {id}")
return id, True, state_updates
except Exception as e:
Expand All @@ -1011,6 +1022,7 @@ def upsert_record(self, record, context):
response = self.request_api("PATCH", endpoint=url, request_data={k: record[k] for k in set(list(record.keys())) - set([id_field])})
id = response.json().get("id")
self.logger.info(f"{object_type} updated with id: {id}")
self.link_attachment_to_object(id, linked_object_id)
return id, True, state_updates
except Exception as e:
self.logger.exception(f"Could not PATCH to {url}: {e}")
Expand All @@ -1022,6 +1034,7 @@ def upsert_record(self, record, context):
response = self.request_api("POST", endpoint=endpoint, request_data=record)
id = response.json().get("id")
self.logger.info(f"{object_type} created with id: {id}")
self.link_attachment_to_object(id, linked_object_id)
return id, True, state_updates
except Exception as e:
if "INVALID_FIELD_FOR_INSERT_UPDATE" in str(e):
Expand All @@ -1047,3 +1060,32 @@ def upsert_record(self, record, context):
self.logger.exception(f"Error encountered while creating {object_type}")
raise e


def link_attachment_to_object(self, file_id, linked_object_id):
if self.name != "ContentVersion":
return
if not linked_object_id:
self.logger.info(f"Object id not found to link file with id {file_id}")
return
try:
# get contentdocumentid
content_endpoint = "query"
params = {"q": f"SELECT ContentDocumentId FROM ContentVersion WHERE Id = '{file_id}'"}
content_document_id = self.request_api("GET", endpoint=content_endpoint, params=params)
content_document_id = content_document_id.json()
if content_document_id.get("records"):
content_document_id = content_document_id["records"][0]["ContentDocumentId"]
else:
raise Exception(f"Failed while trying to link file {file_id} and object {linked_object_id} because ContentDocumentId was not found")

endpoint = "sobjects/ContentDocumentLink"
record = {
"ContentDocumentId": content_document_id,
"LinkedEntityId": linked_object_id,
"ShareType": "V"
}
response = self.request_api("POST", endpoint=endpoint, request_data=record)
self.logger.info(f"File with id {file_id} succesfully linked to object with id {linked_object_id}. Link id {response.json()['id']}")
except Exception as e:
self.logger.info(f"Failed while trying to link file {file_id} and object {linked_object_id}")
raise e

0 comments on commit 7cb6ff1

Please sign in to comment.