Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support to fetch xlsx reports #39

Open
wants to merge 2 commits into
base: feature/hgi-6644
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
install_requires=[
'requests>=2.20.0,<=2.29.0',
'singer-python==5.3.1',
'xmltodict==0.11.0'
'xmltodict==0.11.0',
'openpyxl==3.1.5'
],
entry_points='''
[console_scripts]
Expand Down
122 changes: 90 additions & 32 deletions tap_salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,22 @@ def build_state(raw_state, catalog):
return state

# pylint: disable=undefined-variable
def create_property_schema(field, mdata):
field_name = field['name']
def create_property_schema(field, mdata, is_report=False):
if is_report:
field_name = field['label']
else:
field_name = field['name']

if field_name == "Id":
mdata = metadata.write(
mdata, ('properties', field_name), 'inclusion', 'automatic')
else:
mdata = metadata.write(
mdata, ('properties', field_name), 'inclusion', 'available')
if is_report:
mdata[('properties', field_name)]["field_metadata"] = field

property_schema, mdata = tap_salesforce.salesforce.field_to_property_schema(field, mdata)
property_schema, mdata = tap_salesforce.salesforce.field_to_property_schema(field, mdata, is_report)

return (property_schema, mdata)

Expand Down Expand Up @@ -203,6 +208,45 @@ def generate_schema(fields, sf, sobject_name, replication_key, sobject_descripti

return entry

def generate_report_schema(fields, report):
mdata = metadata.new()
properties = {}
# Loop over the object's fields
for field_mdata in fields.values():
field_name = field_mdata["label"]
property_schema, mdata = create_property_schema(field_mdata, mdata, True)
properties[field_name] = property_schema

mdata = metadata.write(
mdata,
(),
'forced-replication-method',
{
'replication-method': 'FULL_TABLE'})

mdata = metadata.write(mdata, (), 'table-key-properties', [])

schema = {
'type': 'object',
'additionalProperties': False,
'properties': properties
}

report_stream_name = f'Report_{report["DeveloperName"]}'
entry = {
'stream': report_stream_name,
'tap_stream_id': report_stream_name,
'schema': schema,
'metadata': metadata.to_list(mdata),
'stream_meta': {
'name': report["Name"],
'folder': report["FolderName"],
'Id': report["Id"],
}
}

return entry


def get_reports_list(sf):
output = []
Expand Down Expand Up @@ -365,37 +409,49 @@ def do_discover(sf, custom_tables=list()):
# Handle Reports
if sf.list_reports is True:
reports = get_reports_list(sf)

mdata = metadata.new()
properties = {}

if reports:
for report in reports:
field_name = f"Report_{report['DeveloperName']}"
properties[field_name] = dict(type=["null", "object", "string"])
if sf.discover_report_fields:
for report in reports:
report_metadata = sf.describe_report(report["Id"])
if report_metadata:
report_name = report["DeveloperName"]
fields = report_metadata.get("reportExtendedMetadata",{}).get("detailColumnInfo")
if not fields: # check how is the json response for these catalogs
LOGGER.info(f"No columns found for report {report_name}, not adding to catalog")
continue
# build schema from fields
report_stream = generate_report_schema(fields, report)
entries.append(report_stream)

mdata = metadata.write(
mdata,
(),
'forced-replication-method',
{'replication-method': 'FULL_TABLE'})

mdata = metadata.write(mdata, (), 'table-key-properties', [])

schema = {
'type': 'object',
'additionalProperties': False,
'properties': properties
}

entry = {
'stream': "ReportList",
'tap_stream_id': "ReportList",
'schema': schema,
'metadata': metadata.to_list(mdata)
}

entries.append(entry)
else:
mdata = metadata.new()
properties = {}
for report in reports:
field_name = f"Report_{report['DeveloperName']}"
properties[field_name] = dict(type=["null", "object", "string"])

mdata = metadata.write(
mdata,
(),
'forced-replication-method',
{'replication-method': 'FULL_TABLE'})

mdata = metadata.write(mdata, (), 'table-key-properties', [])

schema = {
'type': 'object',
'additionalProperties': False,
'properties': properties
}

entry = {
'stream': "ReportList",
'tap_stream_id': "ReportList",
'schema': schema,
'metadata': metadata.to_list(mdata)
}

entries.append(entry)

# For each custom setting field, remove its associated tag from entries
# See Blacklisting.md for more information
Expand Down Expand Up @@ -598,6 +654,8 @@ def main_impl():
default_start_date=CONFIG.get('start_date'),
api_type=CONFIG.get('api_type'),
list_reports=CONFIG.get('list_reports'),
discover_report_fields=CONFIG.get('discover_report_fields'),
report_metadata=CONFIG.get('report_metadata'),
list_views=CONFIG.get('list_views'),
api_version=CONFIG.get('api_version')
)
Expand Down
37 changes: 31 additions & 6 deletions tap_salesforce/salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
'email',
'complexvalue', # TODO: Unverified
'masterrecord',
'datacategorygroupreference'
'datacategorygroupreference',
'html'
])

NUMBER_TYPES = set([
Expand Down Expand Up @@ -137,11 +138,15 @@ def log_backoff_attempt(details):
LOGGER.info("ConnectionFailure detected, triggering backoff: %d try", details.get("tries"))


def field_to_property_schema(field, mdata): # pylint:disable=too-many-branches
def field_to_property_schema(field, mdata, is_report=False): # pylint:disable=too-many-branches
property_schema = {}

field_name = field['name']
sf_type = field['type']
if is_report:
field_name = field['label']
sf_type = field['dataType']
else:
field_name = field['name']
sf_type = field['type']

if sf_type in STRING_TYPES:
property_schema['type'] = "string"
Expand Down Expand Up @@ -207,6 +212,8 @@ def __init__(self,
default_start_date=None,
api_type=None,
list_reports=False,
discover_report_fields=False,
report_metadata = None,
list_views=False,
api_version=None):
self.api_type = api_type.upper() if api_type else None
Expand All @@ -218,6 +225,8 @@ def __init__(self,
self.access_token = None
self.instance_url = None
self.list_reports = list_reports
self.discover_report_fields = discover_report_fields
self.report_metadata = report_metadata
self.list_views= list_views
if isinstance(quota_percent_per_run, str) and quota_percent_per_run.strip() == '':
quota_percent_per_run = None
Expand Down Expand Up @@ -279,14 +288,14 @@ def check_rest_quota_usage(self, headers):
max_tries=10,
factor=2,
on_backoff=log_backoff_attempt)
def _make_request(self, http_method, url, headers=None, body=None, stream=False, params=None, validate_json=False, timeout=None):
def _make_request(self, http_method, url, headers=None, body=None, stream=False, params=None, validate_json=False, timeout=None, json=None):
if http_method == "GET":
LOGGER.info("Making %s request to %s with params: %s", http_method, url, params)
resp = self.session.get(url, headers=headers, stream=stream, params=params, timeout=timeout)
LOGGER.info("Completed %s request to %s with params: %s", http_method, url, params)
elif http_method == "POST":
LOGGER.info("Making %s request to %s with body %s", http_method, url, body)
resp = self.session.post(url, headers=headers, data=body)
resp = self.session.post(url, headers=headers, data=body, json=json, params=params)
else:
raise TapSalesforceException("Unsupported HTTP method")

Expand Down Expand Up @@ -362,7 +371,23 @@ def describe(self, sobject=None):
return None

return resp.json()

def describe_report(self, report_id):
"""Describes all fields for a specific report"""
headers = self._get_standard_headers()
endpoint = "analytics/reports/{}/describe".format(report_id)
endpoint_tag = report_id
url = self.data_url.format(self.instance_url, endpoint)

with metrics.http_request_timer("describe") as timer:
timer.tags['endpoint'] = endpoint_tag
try:
resp = self._make_request('GET', url, headers=headers, timeout=(5*60))
except Exception as e:
LOGGER.warning(f"Failed to describe report {report_id}, not adding to the catalog. Response: {e}")
return None

return resp.json()

def listview(self, sobject, listview):
headers = self._get_standard_headers()
Expand Down
106 changes: 75 additions & 31 deletions tap_salesforce/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from requests.exceptions import RequestException
from tap_salesforce.salesforce.bulk import Bulk
import base64
from openpyxl import load_workbook
from io import BytesIO

LOGGER = singer.get_logger()

Expand Down Expand Up @@ -261,40 +263,82 @@ def get_custom_table(stream):

if catalog_entry["stream"].startswith("Report_"):
report_name = catalog_entry["stream"].split("Report_", 1)[1]

reports = []
done = False
headers = sf._get_standard_headers()
endpoint = "queryAll"
params = {'q': 'SELECT Id,DeveloperName FROM Report'}
url = sf.data_url.format(sf.instance_url, endpoint)

while not done:
response = sf._make_request('GET', url, headers=headers, params=params)
response_json = response.json()
done = response_json.get("done")
reports.extend(response_json.get("records", []))
if not done:
url = sf.instance_url+response_json.get("nextRecordsUrl")

report = [r for r in reports if report_name==r["DeveloperName"]][0]
report_id = report["Id"]

endpoint = f"analytics/reports/{report_id}"
url = sf.data_url.format(sf.instance_url, endpoint)
response = sf._make_request('GET', url, headers=headers)
if sf.discover_report_fields:
report_id = catalog_entry["stream_meta"]["Id"]
endpoint = f"analytics/reports/{report_id}"
url = sf.data_url.format(sf.instance_url, endpoint)
# add headers to get report as xlsx
xlsx_headers = {"Accept": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"}
headers.update(xlsx_headers)
params = {"export": 1, "enc": "UTF-8", "xf": "xlsx", "data":2, "includeDetails": True}
# check if reportMetadata is in config for the current report
if sf.report_metadata:
body = [rm for rm in sf.report_metadata if rm.get(report_name)]
if body:
body = body[0][report_name]
headers.update({"Content-Type": "application/json"})
response = sf._make_request('POST', url, headers=headers, params=params, json=body)
# read the excel file by row, process and export the output
# openpyxl maintains typing
excel_file = BytesIO(response.content)
workbook = load_workbook(excel_file)
sheet = workbook.active
headers = None
first_field = next(iter(catalog_entry["schema"]["properties"]), None)
for row in sheet.iter_rows(values_only=True):
# look for the header row
if row[1] == first_field:
headers = row
continue
if headers:
if row[1].lower() == "total":
# end of data
return
processed_row = {headers[i]: row[i] for i in range(len(headers)) if headers[i] is not None}
singer.write_message(
singer.RecordMessage(
stream=(
stream_alias or stream),
record=processed_row,
version=stream_version,
time_extracted=start_time))


else:
reports = []
done = False
endpoint = "queryAll"
params = {'q': 'SELECT Id,DeveloperName FROM Report'}
url = sf.data_url.format(sf.instance_url, endpoint)

while not done:
response = sf._make_request('GET', url, headers=headers, params=params)
response_json = response.json()
done = response_json.get("done")
reports.extend(response_json.get("records", []))
if not done:
url = sf.instance_url+response_json.get("nextRecordsUrl")

report = [r for r in reports if report_name==r["DeveloperName"]][0]
report_id = report["Id"]

endpoint = f"analytics/reports/{report_id}"
url = sf.data_url.format(sf.instance_url, endpoint)
response = sf._make_request('GET', url, headers=headers)

with Transformer(pre_hook=transform_bulk_data_hook) as transformer:
rec = transformer.transform(response.json(), schema)
rec = fix_record_anytype(rec, schema)
stream = stream.replace("/","_")
singer.write_message(
singer.RecordMessage(
stream=(
stream_alias or stream),
record=rec,
version=stream_version,
time_extracted=start_time))
with Transformer(pre_hook=transform_bulk_data_hook) as transformer:
rec = transformer.transform(response.json(), schema)
rec = fix_record_anytype(rec, schema)
stream = stream.replace("/","_")
singer.write_message(
singer.RecordMessage(
stream=(
stream_alias or stream),
record=rec,
version=stream_version,
time_extracted=start_time))

elif "ListViews" == catalog_entry["stream"]:
headers = sf._get_standard_headers()
Expand Down