diff --git a/setup.py b/setup.py index bf64420..7b11d35 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,8 @@ install_requires=[ 'requests>=2.20.0,<=2.29.0', 'singer-python==5.3.1', - 'xmltodict==0.11.0' + 'xmltodict==0.11.0', + 'openpyxl==3.1.5' ], entry_points=''' [console_scripts] diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index ca771bc..71257b5 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -89,8 +89,11 @@ def build_state(raw_state, catalog): return state # pylint: disable=undefined-variable -def create_property_schema(field, mdata): - field_name = field['name'] +def create_property_schema(field, mdata, is_report=False): + if is_report: + field_name = field['label'] + else: + field_name = field['name'] if field_name == "Id": mdata = metadata.write( @@ -98,8 +101,10 @@ def create_property_schema(field, mdata): else: mdata = metadata.write( mdata, ('properties', field_name), 'inclusion', 'available') + if is_report: + mdata[('properties', field_name)]["field_metadata"] = field - property_schema, mdata = tap_salesforce.salesforce.field_to_property_schema(field, mdata) + property_schema, mdata = tap_salesforce.salesforce.field_to_property_schema(field, mdata, is_report) return (property_schema, mdata) @@ -203,6 +208,45 @@ def generate_schema(fields, sf, sobject_name, replication_key, sobject_descripti return entry +def generate_report_schema(fields, report): + mdata = metadata.new() + properties = {} + # Loop over the object's fields + for field_mdata in fields.values(): + field_name = field_mdata["label"] + property_schema, mdata = create_property_schema(field_mdata, mdata, True) + properties[field_name] = property_schema + + mdata = metadata.write( + mdata, + (), + 'forced-replication-method', + { + 'replication-method': 'FULL_TABLE'}) + + mdata = metadata.write(mdata, (), 'table-key-properties', []) + + schema = { + 'type': 'object', + 'additionalProperties': False, + 'properties': properties + } + + report_stream_name = f'Report_{report["DeveloperName"]}' + entry = { + 'stream': report_stream_name, + 'tap_stream_id': report_stream_name, + 'schema': schema, + 'metadata': metadata.to_list(mdata), + 'stream_meta': { + 'name': report["Name"], + 'folder': report["FolderName"], + 'Id': report["Id"], + } + } + + return entry + def get_reports_list(sf): output = [] @@ -365,37 +409,49 @@ def do_discover(sf, custom_tables=list()): # Handle Reports if sf.list_reports is True: reports = get_reports_list(sf) - - mdata = metadata.new() - properties = {} - if reports: - for report in reports: - field_name = f"Report_{report['DeveloperName']}" - properties[field_name] = dict(type=["null", "object", "string"]) + if sf.discover_report_fields: + for report in reports: + report_metadata = sf.describe_report(report["Id"]) + if report_metadata: + report_name = report["DeveloperName"] + fields = report_metadata.get("reportExtendedMetadata",{}).get("detailColumnInfo") + if not fields: # check how is the json response for these catalogs + LOGGER.info(f"No columns found for report {report_name}, not adding to catalog") + continue + # build schema from fields + report_stream = generate_report_schema(fields, report) + entries.append(report_stream) - mdata = metadata.write( - mdata, - (), - 'forced-replication-method', - {'replication-method': 'FULL_TABLE'}) - - mdata = metadata.write(mdata, (), 'table-key-properties', []) - - schema = { - 'type': 'object', - 'additionalProperties': False, - 'properties': properties - } - - entry = { - 'stream': "ReportList", - 'tap_stream_id': "ReportList", - 'schema': schema, - 'metadata': metadata.to_list(mdata) - } - - entries.append(entry) + else: + mdata = metadata.new() + properties = {} + for report in reports: + field_name = f"Report_{report['DeveloperName']}" + properties[field_name] = dict(type=["null", "object", "string"]) + + mdata = metadata.write( + mdata, + (), + 'forced-replication-method', + {'replication-method': 'FULL_TABLE'}) + + mdata = metadata.write(mdata, (), 'table-key-properties', []) + + schema = { + 'type': 'object', + 'additionalProperties': False, + 'properties': properties + } + + entry = { + 'stream': "ReportList", + 'tap_stream_id': "ReportList", + 'schema': schema, + 'metadata': metadata.to_list(mdata) + } + + entries.append(entry) # For each custom setting field, remove its associated tag from entries # See Blacklisting.md for more information @@ -598,6 +654,8 @@ def main_impl(): default_start_date=CONFIG.get('start_date'), api_type=CONFIG.get('api_type'), list_reports=CONFIG.get('list_reports'), + discover_report_fields=CONFIG.get('discover_report_fields'), + report_metadata=CONFIG.get('report_metadata'), list_views=CONFIG.get('list_views'), api_version=CONFIG.get('api_version') ) diff --git a/tap_salesforce/salesforce/__init__.py b/tap_salesforce/salesforce/__init__.py index 1cb0805..0b82acf 100644 --- a/tap_salesforce/salesforce/__init__.py +++ b/tap_salesforce/salesforce/__init__.py @@ -39,7 +39,8 @@ 'email', 'complexvalue', # TODO: Unverified 'masterrecord', - 'datacategorygroupreference' + 'datacategorygroupreference', + 'html' ]) NUMBER_TYPES = set([ @@ -137,11 +138,15 @@ def log_backoff_attempt(details): LOGGER.info("ConnectionFailure detected, triggering backoff: %d try", details.get("tries")) -def field_to_property_schema(field, mdata): # pylint:disable=too-many-branches +def field_to_property_schema(field, mdata, is_report=False): # pylint:disable=too-many-branches property_schema = {} - field_name = field['name'] - sf_type = field['type'] + if is_report: + field_name = field['label'] + sf_type = field['dataType'] + else: + field_name = field['name'] + sf_type = field['type'] if sf_type in STRING_TYPES: property_schema['type'] = "string" @@ -207,6 +212,8 @@ def __init__(self, default_start_date=None, api_type=None, list_reports=False, + discover_report_fields=False, + report_metadata = None, list_views=False, api_version=None): self.api_type = api_type.upper() if api_type else None @@ -218,6 +225,8 @@ def __init__(self, self.access_token = None self.instance_url = None self.list_reports = list_reports + self.discover_report_fields = discover_report_fields + self.report_metadata = report_metadata self.list_views= list_views if isinstance(quota_percent_per_run, str) and quota_percent_per_run.strip() == '': quota_percent_per_run = None @@ -279,14 +288,14 @@ def check_rest_quota_usage(self, headers): max_tries=10, factor=2, on_backoff=log_backoff_attempt) - def _make_request(self, http_method, url, headers=None, body=None, stream=False, params=None, validate_json=False, timeout=None): + def _make_request(self, http_method, url, headers=None, body=None, stream=False, params=None, validate_json=False, timeout=None, json=None): if http_method == "GET": LOGGER.info("Making %s request to %s with params: %s", http_method, url, params) resp = self.session.get(url, headers=headers, stream=stream, params=params, timeout=timeout) LOGGER.info("Completed %s request to %s with params: %s", http_method, url, params) elif http_method == "POST": LOGGER.info("Making %s request to %s with body %s", http_method, url, body) - resp = self.session.post(url, headers=headers, data=body) + resp = self.session.post(url, headers=headers, data=body, json=json, params=params) else: raise TapSalesforceException("Unsupported HTTP method") @@ -362,7 +371,23 @@ def describe(self, sobject=None): return None return resp.json() + + def describe_report(self, report_id): + """Describes all fields for a specific report""" + headers = self._get_standard_headers() + endpoint = "analytics/reports/{}/describe".format(report_id) + endpoint_tag = report_id + url = self.data_url.format(self.instance_url, endpoint) + + with metrics.http_request_timer("describe") as timer: + timer.tags['endpoint'] = endpoint_tag + try: + resp = self._make_request('GET', url, headers=headers, timeout=(5*60)) + except Exception as e: + LOGGER.warning(f"Failed to describe report {report_id}, not adding to the catalog. Response: {e}") + return None + return resp.json() def listview(self, sobject, listview): headers = self._get_standard_headers() diff --git a/tap_salesforce/sync.py b/tap_salesforce/sync.py index bc44823..0791679 100644 --- a/tap_salesforce/sync.py +++ b/tap_salesforce/sync.py @@ -6,6 +6,8 @@ from requests.exceptions import RequestException from tap_salesforce.salesforce.bulk import Bulk import base64 +from openpyxl import load_workbook +from io import BytesIO LOGGER = singer.get_logger() @@ -261,40 +263,82 @@ def get_custom_table(stream): if catalog_entry["stream"].startswith("Report_"): report_name = catalog_entry["stream"].split("Report_", 1)[1] - - reports = [] - done = False headers = sf._get_standard_headers() - endpoint = "queryAll" - params = {'q': 'SELECT Id,DeveloperName FROM Report'} - url = sf.data_url.format(sf.instance_url, endpoint) - - while not done: - response = sf._make_request('GET', url, headers=headers, params=params) - response_json = response.json() - done = response_json.get("done") - reports.extend(response_json.get("records", [])) - if not done: - url = sf.instance_url+response_json.get("nextRecordsUrl") - - report = [r for r in reports if report_name==r["DeveloperName"]][0] - report_id = report["Id"] - endpoint = f"analytics/reports/{report_id}" - url = sf.data_url.format(sf.instance_url, endpoint) - response = sf._make_request('GET', url, headers=headers) + if sf.discover_report_fields: + report_id = catalog_entry["stream_meta"]["Id"] + endpoint = f"analytics/reports/{report_id}" + url = sf.data_url.format(sf.instance_url, endpoint) + # add headers to get report as xlsx + xlsx_headers = {"Accept": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"} + headers.update(xlsx_headers) + params = {"export": 1, "enc": "UTF-8", "xf": "xlsx", "data":2, "includeDetails": True} + # check if reportMetadata is in config for the current report + if sf.report_metadata: + body = [rm for rm in sf.report_metadata if rm.get(report_name)] + if body: + body = body[0][report_name] + headers.update({"Content-Type": "application/json"}) + response = sf._make_request('POST', url, headers=headers, params=params, json=body) + # read the excel file by row, process and export the output + # openpyxl maintains typing + excel_file = BytesIO(response.content) + workbook = load_workbook(excel_file) + sheet = workbook.active + headers = None + first_field = next(iter(catalog_entry["schema"]["properties"]), None) + for row in sheet.iter_rows(values_only=True): + # look for the header row + if row[1] == first_field: + headers = row + continue + if headers: + if row[1].lower() == "total": + # end of data + return + processed_row = {headers[i]: row[i] for i in range(len(headers)) if headers[i] is not None} + singer.write_message( + singer.RecordMessage( + stream=( + stream_alias or stream), + record=processed_row, + version=stream_version, + time_extracted=start_time)) + + + else: + reports = [] + done = False + endpoint = "queryAll" + params = {'q': 'SELECT Id,DeveloperName FROM Report'} + url = sf.data_url.format(sf.instance_url, endpoint) + + while not done: + response = sf._make_request('GET', url, headers=headers, params=params) + response_json = response.json() + done = response_json.get("done") + reports.extend(response_json.get("records", [])) + if not done: + url = sf.instance_url+response_json.get("nextRecordsUrl") + + report = [r for r in reports if report_name==r["DeveloperName"]][0] + report_id = report["Id"] + + endpoint = f"analytics/reports/{report_id}" + url = sf.data_url.format(sf.instance_url, endpoint) + response = sf._make_request('GET', url, headers=headers) - with Transformer(pre_hook=transform_bulk_data_hook) as transformer: - rec = transformer.transform(response.json(), schema) - rec = fix_record_anytype(rec, schema) - stream = stream.replace("/","_") - singer.write_message( - singer.RecordMessage( - stream=( - stream_alias or stream), - record=rec, - version=stream_version, - time_extracted=start_time)) + with Transformer(pre_hook=transform_bulk_data_hook) as transformer: + rec = transformer.transform(response.json(), schema) + rec = fix_record_anytype(rec, schema) + stream = stream.replace("/","_") + singer.write_message( + singer.RecordMessage( + stream=( + stream_alias or stream), + record=rec, + version=stream_version, + time_extracted=start_time)) elif "ListViews" == catalog_entry["stream"]: headers = sf._get_standard_headers()