Skip to content

Commit

Permalink
Merge pull request #25 from xacadil/contentdocument-download-files
Browse files Browse the repository at this point in the history
Download contect document file if download_files is true
  • Loading branch information
hsyyid authored Feb 2, 2023
2 parents a017462 + 03ab17f commit 504eb2b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 7 deletions.
6 changes: 3 additions & 3 deletions tap_salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def do_discover(sf):
result = {'streams': entries}
json.dump(result, sys.stdout, indent=4)

def do_sync(sf, catalog, state):
def do_sync(sf, catalog, state,config=None):
input_state = state.copy()
starting_stream = state.get("current_stream")

Expand Down Expand Up @@ -494,7 +494,7 @@ def do_sync(sf, catalog, state):
catalog_entry['tap_stream_id'],
'version',
stream_version)
counter = sync_stream(sf, catalog_entry, state, input_state, catalog)
counter = sync_stream(sf, catalog_entry, state, input_state, catalog,config)
LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value)

state["current_stream"] = None
Expand Down Expand Up @@ -528,7 +528,7 @@ def main_impl():
elif args.properties:
catalog = args.properties
state = build_state(args.state, catalog)
do_sync(sf, catalog, state)
do_sync(sf, catalog, state,CONFIG)
finally:
if sf:
if sf.rest_requests_attempted > 0:
Expand Down
23 changes: 19 additions & 4 deletions tap_salesforce/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from singer import Transformer, metadata, metrics
from requests.exceptions import RequestException
from tap_salesforce.salesforce.bulk import Bulk
import base64

LOGGER = singer.get_logger()

Expand Down Expand Up @@ -98,12 +99,12 @@ def resume_syncing_bulk_query(sf, catalog_entry, job_id, state, counter):

return counter

def sync_stream(sf, catalog_entry, state, input_state, catalog):
def sync_stream(sf, catalog_entry, state, input_state, catalog,config=None):
stream = catalog_entry['stream']

with metrics.record_counter(stream) as counter:
try:
sync_records(sf, catalog_entry, state, input_state, counter, catalog)
sync_records(sf, catalog_entry, state, input_state, counter, catalog,config)
singer.write_state(state)
except RequestException as ex:
raise Exception("Error syncing {}: {} Response: {}".format(
Expand Down Expand Up @@ -202,7 +203,11 @@ def handle_ListView(sf,rec_id,sobject,lv_name,lv_catalog_entry,state,input_state
version=lv_stream_version,
time_extracted=start_time))

def sync_records(sf, catalog_entry, state, input_state, counter, catalog):
def sync_records(sf, catalog_entry, state, input_state, counter, catalog,config=None):
download_files = False
if "download_files" in config:
if config['download_files']==True:
download_files = True
chunked_bookmark = singer_utils.strptime_with_tz(sf.get_start_date(state, catalog_entry))
stream = catalog_entry['stream']
schema = catalog_entry['schema']
Expand Down Expand Up @@ -360,7 +365,10 @@ def unwrap_query(query_response, query_field):
with Transformer(pre_hook=transform_bulk_data_hook) as transformer:
rec = transformer.transform(rec, schema)
rec = fix_record_anytype(rec, schema)

if stream=='ContentVersion':
if "IsLatest" in rec:
if rec['IsLatest']==True and download_files==True:
rec['TextPreview'] = base64.b64encode(get_content_document_file(sf,rec['Id'])).decode('utf-8')
singer.write_message(
singer.RecordMessage(
stream=(
Expand Down Expand Up @@ -429,3 +437,10 @@ def try_cast(val, coercion):
rec[k] = val

return rec

def get_content_document_file(sf,contentid):
headers = sf._get_standard_headers()
endpoint = f"sobjects/ContentVersion/{contentid}/VersionData"
url = sf.data_url.format(sf.instance_url, endpoint)
response = sf._make_request('GET', url, headers=headers)
return response.content

0 comments on commit 504eb2b

Please sign in to comment.