Skip to content

Commit

Permalink
Add Attachment object handling (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrekkr authored Oct 17, 2024
1 parent 9573a24 commit dd194d3
Show file tree
Hide file tree
Showing 15 changed files with 1,458 additions and 480 deletions.
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ erDiagram
}
"Entity (User, Event,...)" {
string Id
}
ContentVersion {
string Id
Expand All @@ -189,13 +188,26 @@ erDiagram
string FileExtension
string VersionNumber
}
Attachment {
string Id
reference ParentId
string Name
datetime LastModifiedDate
}
ContentDocument ||--|{ ContentVersion : ""
ContentDocumentLink }o--|| ContentDocument : ""
ContentDocumentLink }o--|| "Entity (User, Event,...)" : ""
Attachment }o--|| "Entity (User, Event,...)" : ""
```
Files (`ContentDocument` objects) can be linked (using `ContentDocumentLink` objects) to multiple entities
(SF objects like `User`, `Case`, and so on). File can have multiple versions (`ContentVersion` objects).

`Attachment` object is a legacy way of storing files in Salesforce. It is not recommended to use it anymore.
However, it is still used in some orgs. `Attachment` object is linked to other objects using `ParentId` field and
no versioning is available.

This project can handle both `ContentDocument` and `Attachment` objects.

### Download

Based on configuration, download process will work as follows:
Expand Down
214 changes: 139 additions & 75 deletions src/salesforce_archivist/archivist.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
import click
import humanize
from pydantic import BaseModel, Field, field_validator, ValidationInfo, computed_field
from typing import Optional
from typing_extensions import Annotated
from typing import Optional, Annotated
from simple_salesforce import Salesforce as SalesforceClient

from salesforce_archivist.salesforce.api import SalesforceApiClient
from salesforce_archivist.salesforce.download import DownloadContentVersionList, DownloadedContentVersionList
from salesforce_archivist.salesforce.download import (
DownloadContentVersionList,
DownloadedList,
DownloadAttachmentList,
DownloadStats,
)
from salesforce_archivist.salesforce.salesforce import Salesforce
from salesforce_archivist.salesforce.validation import ValidatedContentVersionList
from salesforce_archivist.salesforce.validation import ValidatedList, ValidationStats


class ArchivistObject(BaseModel):
Expand Down Expand Up @@ -87,96 +91,156 @@ def __init__(
self._max_workers = max_workers

def download(self) -> bool:
downloaded_content_versions_list = DownloadedContentVersionList(self._data_dir)
downloaded_content_versions_list = DownloadedList(self._data_dir, "downloaded_versions.csv")
if downloaded_content_versions_list.data_file_exist():
downloaded_content_versions_list.load_data_from_file()

global_stats = {
"total": 0,
"processed": 0,
"errors": 0,
"size": 0,
}
downloaded_attachment_list = DownloadedList(self._data_dir, "downloaded_attachments.csv")
if downloaded_attachment_list.data_file_exist():
downloaded_attachment_list.load_data_from_file()

global_stats = DownloadStats()
for archivist_obj in self._objects.values():
obj_type = archivist_obj.obj_type
salesforce = Salesforce(
archivist_obj=archivist_obj,
client=SalesforceApiClient(self._sf_client),
max_api_usage_percent=self._max_api_usage_percent,
)
self._print_msg(msg="Downloading document link list.", obj_type=obj_type)
document_link_list = salesforce.load_content_document_link_list()
self._print_msg(msg="Done.", obj_type=obj_type)
self._print_msg(msg="Downloading content version list.", obj_type=obj_type)
content_version_list = salesforce.load_content_version_list(document_link_list=document_link_list)
self._print_msg(msg="Done.", obj_type=obj_type)
self._print_msg(msg="Downloading files.", obj_type=obj_type)
download_list = DownloadContentVersionList(
document_link_list=document_link_list,
content_version_list=content_version_list,
data_dir=archivist_obj.obj_dir,
)
stats = salesforce.download_files(
download_content_version_list=download_list,
downloaded_content_version_list=downloaded_content_versions_list,
max_workers=self._max_workers,
)
global_stats["total"] += stats.total
global_stats["processed"] += stats.processed
global_stats["errors"] += stats.errors
global_stats["size"] += stats.size
if obj_type == "Attachment":
self._download_attachments(archivist_obj, downloaded_attachment_list, global_stats)
else:
self._download_files(archivist_obj, downloaded_content_versions_list, global_stats)

status = "SUCCESS" if global_stats["errors"] == 0 else "FAILED"
color = "green" if global_stats["errors"] == 0 else "red"
status = "SUCCESS" if global_stats.errors == 0 else "FAILED"
color = "green" if global_stats.errors == 0 else "red"
click.secho(
"[{status}] Download finished. Processed {processed}/{total} ({processed_size}), {errors} errors.".format(
status=status, processed_size=humanize.naturalsize(global_stats["size"], binary=True), **global_stats
status=status,
processed=global_stats.processed,
processed_size=humanize.naturalsize(global_stats.size, binary=True),
errors=global_stats.errors,
total=global_stats.total,
),
fg=color,
)
return global_stats["errors"] == 0
return global_stats.errors == 0

def _download_files(
self,
archivist_obj: ArchivistObject,
downloaded_list: DownloadedList,
global_stats: DownloadStats,
) -> None:
obj_type = archivist_obj.obj_type
salesforce = Salesforce(
archivist_obj=archivist_obj,
client=SalesforceApiClient(self._sf_client),
max_api_usage_percent=self._max_api_usage_percent,
)
self._print_msg(msg="Downloading document link list.", obj_type=obj_type)
document_link_list = salesforce.load_content_document_link_list()
self._print_msg(msg="Done.", obj_type=obj_type)
self._print_msg(msg="Downloading content version list.", obj_type=obj_type)
content_version_list = salesforce.load_content_version_list(document_link_list=document_link_list)
self._print_msg(msg="Done.", obj_type=obj_type)
self._print_msg(msg="Downloading files.", obj_type=obj_type)
download_list = DownloadContentVersionList(
document_link_list=document_link_list,
content_version_list=content_version_list,
data_dir=archivist_obj.obj_dir,
)
stats = salesforce.download_files(
download_list=download_list,
downloaded_list=downloaded_list,
max_workers=self._max_workers,
)
global_stats.combine(stats)

def _download_attachments(
self, archivist_obj: ArchivistObject, downloaded_list: DownloadedList, global_stats: DownloadStats
) -> None:
obj_type = archivist_obj.obj_type
salesforce = Salesforce(
archivist_obj=archivist_obj,
client=SalesforceApiClient(self._sf_client),
max_api_usage_percent=self._max_api_usage_percent,
)
self._print_msg(msg="Downloading attachment list.", obj_type=obj_type)
attachment_list = salesforce.load_attachment_list()
self._print_msg(msg="Done.", obj_type=obj_type)
self._print_msg(msg="Downloading files.", obj_type=obj_type)
download_list = DownloadAttachmentList(
attachment_list=attachment_list,
data_dir=archivist_obj.obj_dir,
)
stats = salesforce.download_files(
download_list=download_list,
downloaded_list=downloaded_list,
max_workers=self._max_workers,
)
global_stats.combine(stats)

def _validate_content_versions_download(
self, archivist_obj: ArchivistObject, validated_list: ValidatedList, global_stats: ValidationStats
) -> bool:
salesforce = Salesforce(
archivist_obj=archivist_obj,
client=SalesforceApiClient(self._sf_client),
max_api_usage_percent=self._max_api_usage_percent,
)
document_link_list = salesforce.load_content_document_link_list()
content_version_list = salesforce.load_content_version_list(
document_link_list=document_link_list,
)
download_list = DownloadContentVersionList(
document_link_list=document_link_list,
content_version_list=content_version_list,
data_dir=archivist_obj.obj_dir,
)
stats = salesforce.validate_download(
download_list=download_list,
validated_list=validated_list,
max_workers=self._max_workers,
)
global_stats.combine(stats)
return stats.invalid == 0

def _validate_attachments_download(
self, archivist_obj: ArchivistObject, validated_list: ValidatedList, global_stats: ValidationStats
) -> bool:
salesforce = Salesforce(
archivist_obj=archivist_obj,
client=SalesforceApiClient(self._sf_client),
max_api_usage_percent=self._max_api_usage_percent,
)
attachment_list = salesforce.load_attachment_list()
download_list = DownloadAttachmentList(
attachment_list=attachment_list,
data_dir=archivist_obj.obj_dir,
)
stats = salesforce.validate_download(
download_list=download_list,
validated_list=validated_list,
max_workers=self._max_workers,
)
global_stats.combine(stats)
return stats.invalid == 0

def validate(self) -> bool:
validated_versions_list = ValidatedContentVersionList(self._data_dir)
if validated_versions_list.data_file_exist():
validated_versions_list.load_data_from_file()
global_stats = {
"total": 0,
"processed": 0,
"invalid": 0,
}
validated_list = ValidatedList(self._data_dir)
if validated_list.data_file_exist():
validated_list.load_data_from_file()
global_stats = ValidationStats()
for archivist_obj in self._objects.values():
salesforce = Salesforce(
archivist_obj=archivist_obj,
client=SalesforceApiClient(self._sf_client),
max_api_usage_percent=self._max_api_usage_percent,
)
document_link_list = salesforce.load_content_document_link_list()
content_version_list = salesforce.load_content_version_list(
document_link_list=document_link_list,
)
download_list = DownloadContentVersionList(
document_link_list=document_link_list,
content_version_list=content_version_list,
data_dir=archivist_obj.obj_dir,
)
stats = salesforce.validate_download(
download_content_version_list=download_list,
validated_content_version_list=validated_versions_list,
max_workers=self._max_workers,
)
global_stats["total"] += stats.total
global_stats["processed"] += stats.processed
global_stats["invalid"] += stats.invalid
status = "SUCCESS" if global_stats["invalid"] == 0 else "FAILED"
color = "green" if global_stats["invalid"] == 0 else "red"
if archivist_obj.obj_type == "Attachment":
self._validate_attachments_download(archivist_obj, validated_list, global_stats)
else:
self._validate_content_versions_download(archivist_obj, validated_list, global_stats)
status = "SUCCESS" if global_stats.invalid == 0 else "FAILED"
color = "green" if global_stats.invalid == 0 else "red"
click.secho(
"[{status}] Download validation finished. Processed {processed}/{total}, {invalid} errors.".format(
status=status, **global_stats
status=status, processed=global_stats.processed, invalid=global_stats.invalid, total=global_stats.total
),
fg=color,
)
return global_stats["invalid"] == 0
return global_stats.invalid == 0

@staticmethod
def _print_msg(msg: str, obj_type: str, fg: str | None = None) -> None:
Expand Down
10 changes: 10 additions & 0 deletions src/salesforce_archivist/salesforce/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from simple_salesforce import Salesforce as SimpleSFClient
from simple_salesforce.api import Usage

from salesforce_archivist.salesforce.attachment import Attachment
from salesforce_archivist.salesforce.content_version import ContentVersion


Expand Down Expand Up @@ -44,6 +45,15 @@ def download_content_version(self, version: ContentVersion) -> Response:
)
return result

def download_attachment(self, attachment: Attachment) -> Response:
result: Response = self._simple_sf_client._call_salesforce(
url="{base}/sobjects/Attachment/{id}/body".format(base=self._simple_sf_client.base_url, id=attachment.id),
method="GET",
headers={"Content-Type": "application/octet-stream"},
stream=True,
)
return result

def get_api_usage(self, refresh: bool = False) -> ApiUsage:
if refresh or self._simple_sf_client.api_usage.get("api-usage") is None:
self._simple_sf_client.limits()
Expand Down
Loading

0 comments on commit dd194d3

Please sign in to comment.