diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index ded2b90..0000000 Binary files a/.DS_Store and /dev/null differ diff --git a/.gitignore b/.gitignore index 8fc7665..9ecc223 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ tests/ .secrets +.DS_Store # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/tap_quickbooks/__init__.py b/tap_quickbooks/__init__.py index 488d1ef..06b3eef 100644 --- a/tap_quickbooks/__init__.py +++ b/tap_quickbooks/__init__.py @@ -5,82 +5,91 @@ import singer.utils as singer_utils from singer import metadata, metrics import tap_quickbooks.quickbooks as quickbooks -from tap_quickbooks.sync import (sync_stream, get_stream_version) +from tap_quickbooks.sync import sync_stream, get_stream_version from tap_quickbooks.quickbooks import Quickbooks from tap_quickbooks.quickbooks.exceptions import ( - TapQuickbooksException, TapQuickbooksQuotaExceededException) + TapQuickbooksException, + TapQuickbooksQuotaExceededException, +) LOGGER = singer.get_logger() REQUIRED_CONFIG_KEYS = [ - 'refresh_token', - 'client_id', - 'client_secret', - 'start_date', - 'realmId', - 'select_fields_by_default' - ] + "refresh_token", + "client_id", + "client_secret", + "start_date", + "realmId", + "select_fields_by_default", +] CONFIG = { - 'refresh_token': None, - 'client_id': None, - 'client_secret': None, - 'start_date': None, - 'include_deleted': None + "refresh_token": None, + "client_id": None, + "client_secret": None, + "start_date": None, + "include_deleted": None, } -REPLICATION_KEY="MetaData.LastUpdatedTime" +REPLICATION_KEY = "MetaData.LastUpdatedTime" + def stream_is_selected(mdata): - return mdata.get((), {}).get('selected', False) + return mdata.get((), {}).get("selected", False) + def build_state(raw_state, catalog): state = {} - for catalog_entry in catalog['streams']: - tap_stream_id = catalog_entry['tap_stream_id'] - catalog_metadata = metadata.to_map(catalog_entry['metadata']) - replication_method = catalog_metadata.get((), {}).get('replication-method') + for catalog_entry in catalog["streams"]: + tap_stream_id = catalog_entry["tap_stream_id"] + catalog_metadata = metadata.to_map(catalog_entry["metadata"]) + replication_method = catalog_metadata.get((), {}).get("replication-method") - version = singer.get_bookmark(raw_state, - tap_stream_id, - 'version') + version = singer.get_bookmark(raw_state, tap_stream_id, "version") # Preserve state that deals with resuming an incomplete bulk job - if singer.get_bookmark(raw_state, tap_stream_id, 'JobID'): - job_id = singer.get_bookmark(raw_state, tap_stream_id, 'JobID') - batches = singer.get_bookmark(raw_state, tap_stream_id, 'BatchIDs') - current_bookmark = singer.get_bookmark(raw_state, tap_stream_id, 'JobHighestBookmarkSeen') - state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) - state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batches) - state = singer.write_bookmark(state, tap_stream_id, 'JobHighestBookmarkSeen', current_bookmark) - - if replication_method == 'INCREMENTAL': - replication_key = catalog_metadata.get((), {}).get('replication-key') - replication_key_value = singer.get_bookmark(raw_state, - tap_stream_id, - replication_key) + if singer.get_bookmark(raw_state, tap_stream_id, "JobID"): + job_id = singer.get_bookmark(raw_state, tap_stream_id, "JobID") + batches = singer.get_bookmark(raw_state, tap_stream_id, "BatchIDs") + current_bookmark = singer.get_bookmark( + raw_state, tap_stream_id, "JobHighestBookmarkSeen" + ) + state = singer.write_bookmark(state, tap_stream_id, "JobID", job_id) + state = singer.write_bookmark(state, tap_stream_id, "BatchIDs", batches) + state = singer.write_bookmark( + state, tap_stream_id, "JobHighestBookmarkSeen", current_bookmark + ) + + if replication_method == "INCREMENTAL": + replication_key = catalog_metadata.get((), {}).get("replication-key") + replication_key_value = singer.get_bookmark( + raw_state, tap_stream_id, replication_key + ) if version is not None: - state = singer.write_bookmark( - state, tap_stream_id, 'version', version) + state = singer.write_bookmark(state, tap_stream_id, "version", version) if replication_key_value is not None: state = singer.write_bookmark( - state, tap_stream_id, replication_key, replication_key_value) - elif replication_method == 'FULL_TABLE' and version is None: - state = singer.write_bookmark(state, tap_stream_id, 'version', version) + state, tap_stream_id, replication_key, replication_key_value + ) + elif replication_method == "FULL_TABLE" and version is None: + state = singer.write_bookmark(state, tap_stream_id, "version", version) return state + # pylint: disable=undefined-variable def create_property_schema(field, mdata): - field_name = field['name'] + field_name = field["name"] if field_name == "Id": mdata = metadata.write( - mdata, ('properties', field_name), 'inclusion', 'automatic') + mdata, ("properties", field_name), "inclusion", "automatic" + ) else: mdata = metadata.write( - mdata, ('properties', field_name), 'inclusion', 'available') + mdata, ("properties", field_name), "inclusion", "available" + ) property_schema, mdata = quickbooks.field_to_property_schema(field, mdata) @@ -90,8 +99,8 @@ def create_property_schema(field, mdata): # pylint: disable=too-many-branches,too-many-statements def do_discover(qb): """Describes a Quickbooks instance's objects and generates a JSON schema for each field.""" - objects_to_discover = qb.describe() - key_properties = ['Id'] + objects_to_discover = qb.describe() + key_properties = ["Id"] qb_custom_setting_objects = [] object_to_tag_references = {} @@ -100,70 +109,72 @@ def do_discover(qb): entries = [] for sobject_name in objects_to_discover: - fields = qb.describe(sobject_name) replication_key = REPLICATION_KEY - if sobject_name.endswith('Report'): + if sobject_name.endswith("Report"): replication_key = None - properties = {} mdata = metadata.new() # Loop over the object's fields for f in fields: - field_name = f['name'] + field_name = f["name"] - property_schema, mdata = create_property_schema( - f, mdata) + property_schema, mdata = create_property_schema(f, mdata) - inclusion = metadata.get( - mdata, ('properties', field_name), 'inclusion') + inclusion = metadata.get(mdata, ("properties", field_name), "inclusion") if qb.select_fields_by_default: mdata = metadata.write( - mdata, ('properties', field_name), 'selected-by-default', True) + mdata, ("properties", field_name), "selected-by-default", True + ) properties[field_name] = property_schema if replication_key: mdata = metadata.write( - mdata, ('properties', replication_key), 'inclusion', 'automatic') + mdata, ("properties", replication_key), "inclusion", "automatic" + ) if replication_key: mdata = metadata.write( - mdata, (), 'valid-replication-keys', [replication_key]) + mdata, (), "valid-replication-keys", [replication_key] + ) else: mdata = metadata.write( mdata, (), - 'forced-replication-method', + "forced-replication-method", { - 'replication-method': 'FULL_TABLE', - 'reason': 'No replication keys found from the Quickbooks API'}) - if sobject_name in ["GeneralLedgerCashReport","GeneralLedgerAccrualReport"]: + "replication-method": "FULL_TABLE", + "reason": "No replication keys found from the Quickbooks API", + }, + ) + if sobject_name in ["GeneralLedgerCashReport", "GeneralLedgerAccrualReport"]: key_properties = [] - mdata = metadata.write(mdata, (), 'table-key-properties', key_properties) + mdata = metadata.write(mdata, (), "table-key-properties", key_properties) schema = { - 'type': 'object', - 'additionalProperties': False, - 'properties': properties + "type": "object", + "additionalProperties": False, + "properties": properties, } entry = { - 'stream': sobject_name, - 'tap_stream_id': sobject_name, - 'schema': schema, - 'metadata': metadata.to_list(mdata) + "stream": sobject_name, + "tap_stream_id": sobject_name, + "schema": schema, + "metadata": metadata.to_list(mdata), } entries.append(entry) - result = {'streams': entries} + result = {"streams": entries} json.dump(result, sys.stdout, indent=4) + def do_sync(qb, catalog, state, state_passed): starting_stream = state.get("current_stream") @@ -174,16 +185,17 @@ def do_sync(qb, catalog, state, state_passed): for catalog_entry in catalog["streams"]: stream_version = get_stream_version(catalog_entry, state) - stream = catalog_entry['stream'] - stream_alias = catalog_entry.get('stream_alias') + stream = catalog_entry["stream"] + stream_alias = catalog_entry.get("stream_alias") stream_name = catalog_entry["tap_stream_id"] activate_version_message = singer.ActivateVersionMessage( - stream=(stream_alias or stream), version=stream_version) + stream=(stream_alias or stream), version=stream_version + ) - catalog_metadata = metadata.to_map(catalog_entry['metadata']) - replication_key = catalog_metadata.get((), {}).get('replication-key') + catalog_metadata = metadata.to_map(catalog_entry["metadata"]) + replication_key = catalog_metadata.get((), {}).get("replication-key") - mdata = metadata.to_map(catalog_entry['metadata']) + mdata = metadata.to_map(catalog_entry["metadata"]) if not stream_is_selected(mdata): LOGGER.info("%s: Skipping - not selected", stream_name) @@ -201,15 +213,20 @@ def do_sync(qb, catalog, state, state_passed): state["current_stream"] = stream_name singer.write_state(state) - key_properties = metadata.to_map(catalog_entry['metadata']).get((), {}).get('table-key-properties') + key_properties = ( + metadata.to_map(catalog_entry["metadata"]) + .get((), {}) + .get("table-key-properties") + ) singer.write_schema( stream, - catalog_entry['schema'], + catalog_entry["schema"], key_properties, replication_key, - stream_alias) + stream_alias, + ) - job_id = singer.get_bookmark(state, catalog_entry['tap_stream_id'], 'JobID') + job_id = singer.get_bookmark(state, catalog_entry["tap_stream_id"], "JobID") if job_id: with metrics.record_counter(stream) as counter: # Remove Job info from state once we complete this resumed query. One of a few cases could have occurred: @@ -217,30 +234,41 @@ def do_sync(qb, catalog, state, state_passed): # 2. The job partially completed, in which case make JobHighestBookmarkSeen the new bookmark, or # existing bookmark if no bookmark exists for the Job. # 3. The job completely failed, in which case maintain the existing bookmark, or None if no bookmark - state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobID', None) - state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('BatchIDs', None) - bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}) \ - .pop('JobHighestBookmarkSeen', None) - existing_bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}) \ - .pop(replication_key, None) + state.get("bookmarks", {}).get(catalog_entry["tap_stream_id"], {}).pop( + "JobID", None + ) + state.get("bookmarks", {}).get(catalog_entry["tap_stream_id"], {}).pop( + "BatchIDs", None + ) + bookmark = ( + state.get("bookmarks", {}) + .get(catalog_entry["tap_stream_id"], {}) + .pop("JobHighestBookmarkSeen", None) + ) + existing_bookmark = ( + state.get("bookmarks", {}) + .get(catalog_entry["tap_stream_id"], {}) + .pop(replication_key, None) + ) state = singer.write_bookmark( state, - catalog_entry['tap_stream_id'], + catalog_entry["tap_stream_id"], replication_key, - bookmark or existing_bookmark) # If job is removed, reset to existing bookmark or None + bookmark or existing_bookmark, + ) # If job is removed, reset to existing bookmark or None singer.write_state(state) else: # Tables with a replication_key or an empty bookmark will emit an # activate_version at the beginning of their sync - bookmark_is_empty = state.get('bookmarks', {}).get( - catalog_entry['tap_stream_id']) is None + bookmark_is_empty = ( + state.get("bookmarks", {}).get(catalog_entry["tap_stream_id"]) is None + ) if replication_key or bookmark_is_empty: singer.write_message(activate_version_message) - state = singer.write_bookmark(state, - catalog_entry['tap_stream_id'], - 'version', - stream_version) + state = singer.write_bookmark( + state, catalog_entry["tap_stream_id"], "version", stream_version + ) counter = sync_stream(qb, catalog_entry, state, state_passed) LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value) @@ -248,6 +276,7 @@ def do_sync(qb, catalog, state, state_passed): singer.write_state(state) LOGGER.info("Finished sync") + def main_impl(): args = singer_utils.parse_args(REQUIRED_CONFIG_KEYS) @@ -271,9 +300,17 @@ def main_impl(): report_period_days = CONFIG.get('report_period_days'), reports_full_sync = CONFIG.get('reports_full_sync', False), gl_full_sync = CONFIG.get('gl_full_sync'), + pl_detail_full_sync = CONFIG.get('pl_detail_full_sync'), + monthly_balance_sheet_full_sync = CONFIG.get('monthly_balance_sheet_full_sync'), gl_weekly = CONFIG.get('gl_weekly', False), gl_daily = CONFIG.get('gl_daily', False), - gl_basic_fields = CONFIG.get('gl_basic_fields', False)) + gl_basic_fields = CONFIG.get('gl_basic_fields', False), + pnl_adjusted_gain_loss=CONFIG.get("pnl_adjusted_gain_loss", False), + pnl_monthly=CONFIG.get("pnl_monthly", False), + ar_aging_report_date=CONFIG.get("ar_aging_report_date", False), + ar_aging_report_dates=CONFIG.get("ar_aging_report_dates", False), + ) + qb.login() if args.discover: @@ -288,7 +325,8 @@ def main_impl(): if qb.rest_requests_attempted > 0: LOGGER.debug( "This job used %s REST requests towards the Quickbooks quota.", - qb.rest_requests_attempted) + qb.rest_requests_attempted, + ) if qb.login_timer: qb.login_timer.cancel() @@ -306,5 +344,6 @@ def main(): LOGGER.critical(e) raise e + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/tap_quickbooks/quickbooks/__init__.py b/tap_quickbooks/quickbooks/__init__.py index 15569e3..06f958d 100644 --- a/tap_quickbooks/quickbooks/__init__.py +++ b/tap_quickbooks/quickbooks/__init__.py @@ -8,24 +8,36 @@ from requests.exceptions import RequestException import singer import singer.utils as singer_utils -import os; +import os from typing import Dict from singer import metadata, metrics -from tap_quickbooks.quickbooks.reportstreams.MonthlyBalanceSheetReport import MonthlyBalanceSheetReport -from tap_quickbooks.quickbooks.reportstreams.ProfitAndLossDetailReport import ProfitAndLossDetailReport -from tap_quickbooks.quickbooks.reportstreams.BalanceSheetReport import BalanceSheetReport -from tap_quickbooks.quickbooks.reportstreams.GeneralLedgerAccrualReport import GeneralLedgerAccrualReport -from tap_quickbooks.quickbooks.reportstreams.GeneralLedgerCashReport import GeneralLedgerCashReport +from tap_quickbooks.quickbooks.reportstreams.MonthlyBalanceSheetReport import ( + MonthlyBalanceSheetReport, +) +from tap_quickbooks.quickbooks.reportstreams.ProfitAndLossDetailReport import ( + ProfitAndLossDetailReport, +) +from tap_quickbooks.quickbooks.reportstreams.BalanceSheetReport import ( + BalanceSheetReport, +) +from tap_quickbooks.quickbooks.reportstreams.GeneralLedgerAccrualReport import ( + GeneralLedgerAccrualReport, +) +from tap_quickbooks.quickbooks.reportstreams.GeneralLedgerCashReport import ( + GeneralLedgerCashReport, +) from tap_quickbooks.quickbooks.reportstreams.CashFlowReport import CashFlowReport from tap_quickbooks.quickbooks.reportstreams.DailyCashFlowReport import DailyCashFlowReport from tap_quickbooks.quickbooks.reportstreams.MonthlyCashFlowReport import MonthlyCashFlowReport from tap_quickbooks.quickbooks.reportstreams.TransactionListReport import TransactionListReport from tap_quickbooks.quickbooks.reportstreams.ARAgingSummaryReport import ARAgingSummaryReport +from tap_quickbooks.quickbooks.reportstreams.ArAgingDetailReport import ARAgingDetailReport from tap_quickbooks.quickbooks.rest import Rest from tap_quickbooks.quickbooks.exceptions import ( TapQuickbooksException, - TapQuickbooksQuotaExceededException) + TapQuickbooksQuotaExceededException, +) LOGGER = singer.get_logger() @@ -36,7 +48,9 @@ def log_backoff_attempt(details): - LOGGER.info("ConnectionError detected, triggering backoff: %d try", details.get("tries")) + LOGGER.info( + "ConnectionError detected, triggering backoff: %d try", details.get("tries") + ) def _get_abs_path(path: str) -> str: @@ -44,16 +58,16 @@ def _get_abs_path(path: str) -> str: def _load_object_definitions() -> Dict: - '''Loads a JSON schema file for a given + """Loads a JSON schema file for a given Quickbooks Report resource into a dict representation. - ''' + """ schema_path = _get_abs_path("schemas") return singer.utils.load_json(f"{schema_path}/object_definition.json") def read_json_file(filename): # read file - with open(f"{filename}", 'r') as filetoread: + with open(f"{filename}", "r") as filetoread: data = filetoread.read() # parse file @@ -63,7 +77,7 @@ def read_json_file(filename): def write_json_file(filename, content): - with open(filename, 'w') as f: + with open(filename, "w") as f: json.dump(content, f, indent=4) @@ -72,55 +86,24 @@ def write_json_file(filename, content): def field_to_property_schema(field, mdata): # pylint:disable=too-many-branches + number_type = {"type": ["null", "number"]} - number_type = { - "type": [ - "null", - "number" - ] - } + string_type = {"type": ["string", "null"]} - string_type = { - "type": [ - "string", - "null" - ] - } - - boolean_type = { - "type": [ - "boolean", - "null" - ] - } + boolean_type = {"type": ["boolean", "null"]} - datetime_type = { - "anyOf": [ - { - "type": "string", - "format": "date-time" - }, - string_type - ] - } + datetime_type = {"anyOf": [{"type": "string", "format": "date-time"}, string_type]} - object_type = { - "type": [ - "null", - "object" - ] - } + object_type = {"type": ["null", "object"]} - array_type = { - "type": ["null", "array"] - } + array_type = {"type": ["null", "array"]} ref_type = { "type": object_type["type"], "properties": { "value": string_type, "name": string_type, - } + }, } qb_types = { @@ -134,7 +117,7 @@ def field_to_property_schema(field, mdata): # pylint:disable=too-many-branches "email": string_type, "address": string_type, "metadata": string_type, - "ref_type": ref_type + "ref_type": ref_type, } qb_types["custom_field"] = { @@ -143,8 +126,8 @@ def field_to_property_schema(field, mdata): # pylint:disable=too-many-branches "DefinitionId": string_type, "Name": string_type, "Type": string_type, - "StringValue": string_type - } + "StringValue": string_type, + }, } qb_types["invoice_line"] = { @@ -165,30 +148,28 @@ def field_to_property_schema(field, mdata): # pylint:disable=too-many-branches "Qty": number_type, "UnitPrice": number_type, "ServiceDate": qb_types["datetime"], - "Description" : string_type - } + "Description": string_type, + }, }, "SubTotalLineDetail": { "type": object_type["type"], - "properties": { - "ItemRef": qb_types["ref_type"] - } + "properties": {"ItemRef": qb_types["ref_type"]}, }, "DiscountLineDetail": { "type": object_type["type"], "properties": { "DiscountAccountRef": qb_types["object_reference"], - "DiscountPercent": number_type - } + "DiscountPercent": number_type, + }, }, "DescriptionLineDetail": { "type": object_type["type"], "properties": { "TaxCodeRef": qb_types["object_reference"], - "ServiceDate": qb_types["datetime"] - } - } - } + "ServiceDate": qb_types["datetime"], + }, + }, + }, } qb_types["journal_entry_line"] = { @@ -206,27 +187,28 @@ def field_to_property_schema(field, mdata): # pylint:disable=too-many-branches "type": object_type["type"], "properties": { "Type": string_type, - "EntityRef": qb_types["object_reference"] - } + "EntityRef": qb_types["object_reference"], + }, }, "AccountRef": qb_types["object_reference"], "ClassRef": qb_types["object_reference"], - "DepartmentRef": qb_types["object_reference"] - } - } - } + "DepartmentRef": qb_types["object_reference"], + }, + }, + }, } - qb_type = field['type'] + qb_type = field["type"] property_schema = qb_types[qb_type] - if qb_type == 'array': - property_schema["items"] = qb_types[field['child_type']] + if qb_type == "array": + property_schema["items"] = qb_types[field["child_type"]] return property_schema, mdata -class Quickbooks(): +class Quickbooks: # pylint: disable=too-many-instance-attributes,too-many-arguments + def __init__(self, refresh_token=None, token=None, @@ -242,13 +224,22 @@ def __init__(self, report_period_days = None, reports_full_sync = None, gl_full_sync = None, + pl_detail_full_sync = None, + monthly_balance_sheet_full_sync = None, gl_weekly = None, gl_daily = None, gl_basic_fields = None, - realm_id=None): + realm_id=None, + pnl_adjusted_gain_loss=None, + pnl_monthly=None, + ar_aging_report_date=None, + ar_aging_report_dates=None, + ): self.api_type = api_type.upper() if api_type else None self.report_period_days = report_period_days self.gl_full_sync = gl_full_sync + self.pl_detail_full_sync = pl_detail_full_sync + self.monthly_balance_sheet_full_sync = monthly_balance_sheet_full_sync self.reports_full_sync = reports_full_sync self.gl_weekly = gl_weekly self.gl_daily = gl_daily @@ -261,24 +252,40 @@ def __init__(self, self.qb_client_secret = qb_client_secret self.session = requests.Session() self.access_token = None - - self.base_url = "https://sandbox-quickbooks.api.intuit.com/v3/company/" if is_sandbox is True else 'https://quickbooks.api.intuit.com/v3/company/' + self.pnl_adjusted_gain_loss = pnl_adjusted_gain_loss + self.pnl_monthly = pnl_monthly + self.ar_aging_report_date = ar_aging_report_date + self.ar_aging_report_dates = ar_aging_report_dates + self.base_url = ( + "https://sandbox-quickbooks.api.intuit.com/v3/company/" + if is_sandbox is True + else "https://quickbooks.api.intuit.com/v3/company/" + ) self.instance_url = f"{self.base_url}{realm_id}" LOGGER.info(f"Instance URL :- {self.instance_url}") - if isinstance(quota_percent_per_run, str) and quota_percent_per_run.strip() == '': + if ( + isinstance(quota_percent_per_run, str) + and quota_percent_per_run.strip() == "" + ): quota_percent_per_run = None - if isinstance(quota_percent_total, str) and quota_percent_total.strip() == '': + if isinstance(quota_percent_total, str) and quota_percent_total.strip() == "": quota_percent_total = None - self.quota_percent_per_run = float( - quota_percent_per_run) if quota_percent_per_run is not None else 25 - self.quota_percent_total = float( - quota_percent_total) if quota_percent_total is not None else 80 - self.is_sandbox = is_sandbox is True or (isinstance(is_sandbox, str) and is_sandbox.lower() == 'true') + self.quota_percent_per_run = ( + float(quota_percent_per_run) if quota_percent_per_run is not None else 25 + ) + self.quota_percent_total = ( + float(quota_percent_total) if quota_percent_total is not None else 80 + ) + self.is_sandbox = is_sandbox is True or ( + isinstance(is_sandbox, str) and is_sandbox.lower() == "true" + ) self.select_fields_by_default = select_fields_by_default is True or ( - isinstance(select_fields_by_default, str) and select_fields_by_default.lower() == 'true') + isinstance(select_fields_by_default, str) + and select_fields_by_default.lower() == "true" + ) self.default_start_date = default_start_date self.rest_requests_attempted = 0 self.jobs_completed = 0 @@ -294,7 +301,7 @@ def _get_standard_headers(self): # pylint: disable=anomalous-backslash-in-string,line-too-long def check_rest_quota_usage(self, headers): - match = re.search('^api-usage=(\d+)/(\d+)$', headers.get('Sforce-Limit-Info')) + match = re.search("^api-usage=(\d+)/(\d+)$", headers.get("Sforce-Limit-Info")) if match is None: return @@ -307,31 +314,42 @@ def check_rest_quota_usage(self, headers): max_requests_for_run = int((self.quota_percent_per_run * allotted) / 100) if percent_used_from_total > self.quota_percent_total: - total_message = ("Quickbooks has reported {}/{} ({:3.2f}%) total REST quota " + - "used across all Quickbooks Applications. Terminating " + - "replication to not continue past configured percentage " + - "of {}% total quota.").format(remaining, - allotted, - percent_used_from_total, - self.quota_percent_total) + total_message = ( + "Quickbooks has reported {}/{} ({:3.2f}%) total REST quota " + + "used across all Quickbooks Applications. Terminating " + + "replication to not continue past configured percentage " + + "of {}% total quota." + ).format( + remaining, allotted, percent_used_from_total, self.quota_percent_total + ) raise TapQuickbooksQuotaExceededException(total_message) elif self.rest_requests_attempted > max_requests_for_run: - partial_message = ("This replication job has made {} REST requests ({:3.2f}% of " + - "total quota). Terminating replication due to allotted " + - "quota of {}% per replication.").format(self.rest_requests_attempted, - (self.rest_requests_attempted / allotted) * 100, - self.quota_percent_per_run) + partial_message = ( + "This replication job has made {} REST requests ({:3.2f}% of " + + "total quota). Terminating replication due to allotted " + + "quota of {}% per replication." + ).format( + self.rest_requests_attempted, + (self.rest_requests_attempted / allotted) * 100, + self.quota_percent_per_run, + ) raise TapQuickbooksQuotaExceededException(partial_message) # pylint: disable=too-many-arguments - @backoff.on_exception(backoff.expo, - requests.exceptions.ConnectionError, - max_tries=10, - factor=2, - on_backoff=log_backoff_attempt) - def _make_request(self, http_method, url, headers=None, body=None, stream=False, params=None): + @backoff.on_exception( + backoff.expo, + requests.exceptions.ConnectionError, + max_tries=10, + factor=2, + on_backoff=log_backoff_attempt, + ) + def _make_request( + self, http_method, url, headers=None, body=None, stream=False, params=None + ): if http_method == "GET": - LOGGER.info("Making %s request to %s with params: %s", http_method, url, params) + LOGGER.info( + "Making %s request to %s with params: %s", http_method, url, params + ) resp = self.session.get(url, headers=headers, stream=stream, params=params) elif http_method == "POST": LOGGER.info("Making %s request to %s with body %s", http_method, url, body) @@ -344,7 +362,7 @@ def _make_request(self, http_method, url, headers=None, body=None, stream=False, except RequestException as ex: raise ex - if resp.headers.get('Sforce-Limit-Info') is not None: + if resp.headers.get("Sforce-Limit-Info") is not None: self.rest_requests_attempted += 1 self.check_rest_quota_usage(resp.headers) @@ -352,35 +370,42 @@ def _make_request(self, http_method, url, headers=None, body=None, stream=False, def login(self): if self.is_sandbox: - login_url = 'https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer' + login_url = "https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer" else: - login_url = 'https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer' + login_url = "https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer" - login_body = {'grant_type': 'refresh_token', 'client_id': self.qb_client_id, - 'client_secret': self.qb_client_secret, 'refresh_token': self.refresh_token} + login_body = { + "grant_type": "refresh_token", + "client_id": self.qb_client_id, + "client_secret": self.qb_client_secret, + "refresh_token": self.refresh_token, + } LOGGER.info("Attempting login via OAuth2") resp = None try: - resp = self._make_request("POST", login_url, body=login_body, - headers={"Content-Type": "application/x-www-form-urlencoded"}) + resp = self._make_request( + "POST", + login_url, + body=login_body, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) LOGGER.info("OAuth2 login successful") auth = resp.json() - self.access_token = auth['access_token'] - - new_refresh_token = auth['refresh_token'] + self.access_token = auth["access_token"] + new_refresh_token = auth["refresh_token"] # persist access_token parser = argparse.ArgumentParser() - parser.add_argument('-c', '--config', help='Config file', required=True) + parser.add_argument("-c", "--config", help="Config file", required=True) _args, unknown = parser.parse_known_args() config_file = _args.config config_content = read_json_file(config_file) - config_content['access_token'] = self.access_token + config_content["access_token"] = self.access_token write_json_file(config_file, config_content) # Check if the refresh token is update, if so update the config file with new refresh token. @@ -388,26 +413,32 @@ def login(self): LOGGER.info(f"Old refresh token [{self.refresh_token}] expired.") LOGGER.info("New Refresh token: {}".format(new_refresh_token)) parser = argparse.ArgumentParser() - parser.add_argument('-c', '--config', help='Config file', required=True) + parser.add_argument("-c", "--config", help="Config file", required=True) _args, unknown = parser.parse_known_args() config_file = _args.config config_content = read_json_file(config_file) - config_content['refresh_token'] = new_refresh_token + config_content["refresh_token"] = new_refresh_token write_json_file(config_file, config_content) self.refresh_token = new_refresh_token except Exception as e: error_message = str(e) - if resp is None and hasattr(e, 'response') and e.response is not None: # pylint:disable=no-member + if ( + resp is None and hasattr(e, "response") and e.response is not None + ): # pylint:disable=no-member resp = e.response # pylint:disable=no-member # NB: requests.models.Response is always falsy here. It is false if status code >= 400 if isinstance(resp, requests.models.Response): - error_message = error_message + ", Response from Quickbooks: {}".format(resp.text) + error_message = error_message + ", Response from Quickbooks: {}".format( + resp.text + ) raise Exception(error_message) from e finally: LOGGER.info("Starting new login timer") - self.login_timer = threading.Timer(REFRESH_TOKEN_EXPIRATION_PERIOD, self.login) + self.login_timer = threading.Timer( + REFRESH_TOKEN_EXPIRATION_PERIOD, self.login + ) self.login_timer.start() def describe(self, sobject=None): @@ -419,34 +450,40 @@ def describe(self, sobject=None): # pylint: disable=no-self-use def _get_selected_properties(self, catalog_entry): - mdata = metadata.to_map(catalog_entry['metadata']) - properties = catalog_entry['schema'].get('properties', {}) - - return [k for k in properties.keys() - if singer.should_sync_field(metadata.get(mdata, ('properties', k), 'inclusion'), - metadata.get(mdata, ('properties', k), 'selected'), - self.select_fields_by_default)] + mdata = metadata.to_map(catalog_entry["metadata"]) + properties = catalog_entry["schema"].get("properties", {}) + + return [ + k + for k in properties.keys() + if singer.should_sync_field( + metadata.get(mdata, ("properties", k), "inclusion"), + metadata.get(mdata, ("properties", k), "selected"), + self.select_fields_by_default, + ) + ] def get_start_date(self, state, catalog_entry): - catalog_metadata = metadata.to_map(catalog_entry['metadata']) - replication_key = catalog_metadata.get((), {}).get('replication-key') + catalog_metadata = metadata.to_map(catalog_entry["metadata"]) + replication_key = catalog_metadata.get((), {}).get("replication-key") - return (singer.get_bookmark(state, - catalog_entry['tap_stream_id'], - replication_key) or self.default_start_date) + return ( + singer.get_bookmark(state, catalog_entry["tap_stream_id"], replication_key) + or self.default_start_date + ) - def _build_query_string(self, catalog_entry, start_date, end_date=None, order_by_clause=True): + def _build_query_string( + self, catalog_entry, start_date, end_date=None, order_by_clause=True + ): selected_properties = self._get_selected_properties(catalog_entry) - query = "SELECT {} FROM {}".format("*", catalog_entry['stream']) + query = "SELECT {} FROM {}".format("*", catalog_entry["stream"]) - catalog_metadata = metadata.to_map(catalog_entry['metadata']) - replication_key = catalog_metadata.get((), {}).get('replication-key') + catalog_metadata = metadata.to_map(catalog_entry["metadata"]) + replication_key = catalog_metadata.get((), {}).get("replication-key") if replication_key: - where_clause = " WHERE {} > '{}' ".format( - replication_key, - start_date) + where_clause = " WHERE {} > '{}' ".format(replication_key, start_date) if end_date: end_date_clause = " AND {} <= {}".format(replication_key, end_date) else: @@ -455,9 +492,10 @@ def _build_query_string(self, catalog_entry, start_date, end_date=None, order_by # order_by = " ORDERBY {} ASC".format(replication_key) # if order_by_clause: # return query + where_clause + end_date_clause + order_by - + LOGGER.info(f"Executing query {query + where_clause + end_date_clause}") return query + where_clause + end_date_clause else: + LOGGER.info(f"Executing query {query}") return query def query(self, catalog_entry, state, state_passed): @@ -466,8 +504,8 @@ def query(self, catalog_entry, state, state_passed): return rest.query(catalog_entry, state) else: raise TapQuickbooksException( - "api_type should be REST was: {}".format( - self.api_type)) + "api_type should be REST was: {}".format(self.api_type) + ) def query_report(self, catalog_entry, state, state_passed): start_date = singer_utils.strptime_with_tz(self.get_start_date(state, catalog_entry)) @@ -475,9 +513,19 @@ def query_report(self, catalog_entry, state, state_passed): state_passed = None if catalog_entry["stream"] == "BalanceSheetReport": - reader = BalanceSheetReport(self, start_date, state_passed) + reader = BalanceSheetReport( + self, + start_date, + state_passed, + pnl_adjusted_gain_loss=self.pnl_adjusted_gain_loss, + ) elif catalog_entry["stream"] == "MonthlyBalanceSheetReport": - reader = MonthlyBalanceSheetReport(self, start_date, state_passed) + reader = MonthlyBalanceSheetReport( + self, + start_date, + state_passed, + pnl_adjusted_gain_loss=self.pnl_adjusted_gain_loss, + ) elif catalog_entry["stream"] == "GeneralLedgerAccrualReport": reader = GeneralLedgerAccrualReport(self, start_date, state_passed) elif catalog_entry["stream"] == "GeneralLedgerCashReport": @@ -490,8 +538,16 @@ def query_report(self, catalog_entry, state, state_passed): reader = MonthlyCashFlowReport(self, start_date, state_passed) elif catalog_entry["stream"] == "ARAgingSummaryReport": reader = ARAgingSummaryReport(self, start_date, state_passed) + elif catalog_entry["stream"] == "ARAgingDetailReport": + reader = ARAgingDetailReport(self, start_date, state_passed) elif catalog_entry["stream"] == "TransactionListReport": reader = TransactionListReport(self, start_date, state_passed) else: - reader = ProfitAndLossDetailReport(self, start_date, state_passed) + reader = ProfitAndLossDetailReport( + self, + start_date, + state_passed, + pnl_adjusted_gain_loss=self.pnl_adjusted_gain_loss, + pnl_monthly=self.pnl_monthly, + ) return reader.sync(catalog_entry) diff --git a/tap_quickbooks/quickbooks/reportstreams/ARAgingSummaryReport.py b/tap_quickbooks/quickbooks/reportstreams/ARAgingSummaryReport.py index 0684c94..520b037 100644 --- a/tap_quickbooks/quickbooks/reportstreams/ARAgingSummaryReport.py +++ b/tap_quickbooks/quickbooks/reportstreams/ARAgingSummaryReport.py @@ -1,8 +1,6 @@ import datetime from typing import ClassVar, Dict, List, Optional - import singer - from tap_quickbooks.quickbooks.rest_reports import QuickbooksStream from tap_quickbooks.sync import transform_data_hook @@ -39,47 +37,61 @@ def sync(self, catalog_entry): "accounting_method": "Accrual" } - LOGGER.info(f"Fetch ARAgingSummary Report for period {params['start_date']} to {params['end_date']}") - resp = self._get(report_entity='AgedReceivables', params=params) - - # Get column metadata. - columns = self._get_column_metadata(resp) - - # Recursively get row data. - row_group = resp.get("Rows") - row_array = row_group.get("Row") + report_dates = [] + if self.qb.ar_aging_report_dates: + for report_date in self.qb.ar_aging_report_dates: + report_dates.append(report_date.split("T")[0]) + elif self.qb.ar_aging_report_date: + report_dates.append(self.qb.ar_aging_report_date.split("T")[0]) + else: + report_dates.append(None) # This is to Run the sync once without specific report_date + + for report_date in report_dates: + if report_date: + params["aging_method"] = "Report_Date" + params["report_date"] = report_date + LOGGER.info(f"Fetch ARAgingSummary Report for period {params['start_date']} to {params['end_date']} with aging_method 'Report_Date' and report_date {report_date}") + else: + LOGGER.info(f"Fetch ARAgingSummary Report for period {params['start_date']} to {params['end_date']}") + resp = self._get(report_entity='AgedReceivables', params=params) - if row_array is None: - return + # Get column metadata. + columns = self._get_column_metadata(resp) - output = [] - for row in row_array: - if "Header" in row: - output.append([i.get('value') for i in row.get("Header", {}).get("ColData", [])]) + # Recursively get row data. + row_group = resp.get("Rows") + row_array = row_group.get("Row") - for subrow in row.get("Rows", {}).get("Row", []): - output.append([i.get('value') for i in subrow.get("ColData", [])]) + if row_array is None: + return - output.append([i.get('value') for i in row.get("Summary", {}).get("ColData", [])]) - elif "Summary" in row: - output.append([i.get('value') for i in row.get("Summary", {}).get("ColData", [])]) - else: - output.append([i.get('value') for i in row.get("ColData", [])]) + output = [] + for row in row_array: + if "Header" in row: + output.append([i.get('value') for i in row.get("Header", {}).get("ColData", [])]) - # Zip columns and row data. - for raw_row in output: - row = dict(zip(columns, raw_row)) - if not row.get("Total"): - # If a row is missing the amount, skip it - continue + for subrow in row.get("Rows", {}).get("Row", []): + output.append([i.get('value') for i in subrow.get("ColData", [])]) - cleansed_row = {} - for k, v in row.items(): - if v == "": - continue + output.append([i.get('value') for i in row.get("Summary", {}).get("ColData", [])]) + elif "Summary" in row: + output.append([i.get('value') for i in row.get("Summary", {}).get("ColData", [])]) else: - cleansed_row.update({k: v}) - - cleansed_row["SyncTimestampUtc"] = singer.utils.strftime(singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ") - - yield cleansed_row + output.append([i.get('value') for i in row.get("ColData", [])]) + + # Zip columns and row data. + for raw_row in output: + row = dict(zip(columns, raw_row)) + row["report_date"] = report_date if report_date else end_date.strftime("%Y-%m-%d") + if not row.get("Total"): + # If a row is missing the amount, skip it + continue + + cleansed_row = {} + for k, v in row.items(): + if v == "": + continue + else: + cleansed_row.update({k: v}) + cleansed_row["SyncTimestampUtc"] = singer.utils.strftime(singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ") + yield cleansed_row diff --git a/tap_quickbooks/quickbooks/reportstreams/ArAgingDetailReport.py b/tap_quickbooks/quickbooks/reportstreams/ArAgingDetailReport.py new file mode 100644 index 0000000..a9a1570 --- /dev/null +++ b/tap_quickbooks/quickbooks/reportstreams/ArAgingDetailReport.py @@ -0,0 +1,93 @@ +import datetime +from typing import ClassVar, Dict, List, Optional +import singer +from tap_quickbooks.quickbooks.rest_reports import QuickbooksStream +from tap_quickbooks.sync import transform_data_hook + +LOGGER = singer.get_logger() +NUMBER_OF_PERIODS = 3 + +class ARAgingDetailReport(QuickbooksStream): + tap_stream_id: ClassVar[str] = 'ARAgingDetailReport' + stream: ClassVar[str] = 'ARAgingDetailReport' + key_properties: ClassVar[List[str]] = [] + replication_method: ClassVar[str] = 'FULL_TABLE' + + def __init__(self, qb, start_date, state_passed): + self.qb = qb + self.start_date = start_date + self.state_passed = state_passed + + def _get_column_metadata(self, resp): + columns = [] + for column in resp.get("Columns").get("Column"): + if column.get("ColTitle") == "" and column.get("ColType") == "Customer": + columns.append("Customer") + else: + columns.append(column.get("ColTitle").replace(" ", "")) + return columns + + def sync(self, catalog_entry): + LOGGER.info(f"Starting full sync of ARAgingDetail") + end_date = datetime.date.today() + start_date = self.start_date + params = { + "start_date": start_date.strftime("%Y-%m-%d"), + "end_date": end_date.strftime("%Y-%m-%d"), + "accounting_method": "Accrual" + } + + report_dates = [] + if self.qb.ar_aging_report_dates: + for report_date in self.qb.ar_aging_report_dates: + report_dates.append(report_date.split("T")[0]) + elif self.qb.ar_aging_report_date: + report_dates.append(self.qb.ar_aging_report_date.split("T")[0]) + else: + report_dates.append(None) # This is to Run the sync once without specific report_date + + for report_date in report_dates: + if report_date: + params["aging_method"] = "Report_Date" + params["report_date"] = report_date + LOGGER.info(f"Fetch ARAgingDetail Report for period {params['start_date']} to {params['end_date']} with aging_method 'Report_Date' and report_date {report_date}") + else: + LOGGER.info(f"Fetch ARAgingDetail Report for period {params['start_date']} to {params['end_date']}") + resp = self._get(report_entity='AgedReceivableDetail', params=params) + + # Get column metadata. + columns = self._get_column_metadata(resp) + + # Recursively get row data. + row_group = resp.get("Rows") + row_array = row_group.get("Row") + if row_array is None: + return + + output = [] + for row in row_array: + if "Header" in row: + output.append([i.get('value') for i in row.get("Header", {}).get("ColData", [])]) + + for subrow in row.get("Rows", {}).get("Row", []): + output.append([i.get('value') for i in subrow.get("ColData", [])]) + + output.append([i.get('value') for i in row.get("Summary", {}).get("ColData", [])]) + elif "Summary" in row: + output.append([i.get('value') for i in row.get("Summary", {}).get("ColData", [])]) + else: + output.append([i.get('value') for i in row.get("ColData", [])]) + + # Zip columns and row data. + for raw_row in output: + row = dict(zip(columns, raw_row)) + row["report_date"] = report_date if report_date else end_date.strftime("%Y-%m-%d") + + cleansed_row = {} + for k, v in row.items(): + if v == "": + continue + else: + cleansed_row.update({k: v}) + cleansed_row["SyncTimestampUtc"] = singer.utils.strftime(singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ") + yield cleansed_row diff --git a/tap_quickbooks/quickbooks/reportstreams/BalanceSheetReport.py b/tap_quickbooks/quickbooks/reportstreams/BalanceSheetReport.py index 6772c80..886c955 100644 --- a/tap_quickbooks/quickbooks/reportstreams/BalanceSheetReport.py +++ b/tap_quickbooks/quickbooks/reportstreams/BalanceSheetReport.py @@ -9,16 +9,18 @@ LOGGER = singer.get_logger() NUMBER_OF_PERIODS = 3 + class BalanceSheetReport(QuickbooksStream): - tap_stream_id: ClassVar[str] = 'BalanceSheetReport' - stream: ClassVar[str] = 'BalanceSheetReport' + tap_stream_id: ClassVar[str] = "BalanceSheetReport" + stream: ClassVar[str] = "BalanceSheetReport" key_properties: ClassVar[List[str]] = [] - replication_method: ClassVar[str] = 'FULL_TABLE' + replication_method: ClassVar[str] = "FULL_TABLE" - def __init__(self, qb, start_date, state_passed): + def __init__(self, qb, start_date, state_passed, pnl_adjusted_gain_loss=None): self.qb = qb self.start_date = start_date self.state_passed = state_passed + self.pnl_adjusted_gain_loss = pnl_adjusted_gain_loss def _get_column_metadata(self, resp): columns = [] @@ -33,8 +35,9 @@ def _get_column_metadata(self, resp): return columns def _recursive_row_search(self, row, output, categories): + header = None row_group = row.get("Rows") - if 'ColData' in list(row.keys()): + if "ColData" in list(row.keys()): # Write the row data = row.get("ColData") values = [column.get("value") for column in data] @@ -45,12 +48,21 @@ def _recursive_row_search(self, row, output, categories): elif row_group is None or row_group == {}: pass else: - row_array = row_group.get("Row") + # row_array = row_group.get("Row") header = row.get("Header") if header is not None: categories.append(header.get("ColData")[0].get("value")) - for row in row_array: - self._recursive_row_search(row, output, categories) + for key, row_item in row.items(): + if isinstance(row_item, str): + continue + if "ColData" in list(row_item.keys()): + self._recursive_row_search(row_item, output, categories) + elif "Row" in list(row_item.keys()): + for sub_row in row_item["Row"]: + self._recursive_row_search(sub_row, output, categories) + elif isinstance(row_item.get(key), dict): + if key in row_item: + self._recursive_row_search(row_item[key], output, categories) if header is not None: categories.pop() @@ -64,11 +76,15 @@ def sync(self, catalog_entry): params = { "start_date": start_date.strftime("%Y-%m-%d"), "end_date": end_date.strftime("%Y-%m-%d"), - "accounting_method": "Accrual" + "accounting_method": "Accrual", } - - LOGGER.info(f"Fetch BalanceSheet Report for period {params['start_date']} to {params['end_date']}") - resp = self._get(report_entity='BalanceSheet', params=params) + if self.pnl_adjusted_gain_loss: + params.update({"adjusted_gain_loss": "true"}) + + LOGGER.info( + f"Fetch BalanceSheet Report for period {params['start_date']} to {params['end_date']}" + ) + resp = self._get(report_entity="BalanceSheet", params=params) # Get column metadata. columns = self._get_column_metadata(resp) @@ -100,7 +116,9 @@ def sync(self, catalog_entry): cleansed_row.update({k: v}) cleansed_row["Total"] = float(row.get("Total")) - cleansed_row["SyncTimestampUtc"] = singer.utils.strftime(singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ") + cleansed_row["SyncTimestampUtc"] = singer.utils.strftime( + singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ" + ) yield cleansed_row else: @@ -112,11 +130,13 @@ def sync(self, catalog_entry): params = { "start_date": start_date.strftime("%Y-%m-%d"), "end_date": end_date.strftime("%Y-%m-%d"), - "accounting_method": "Accrual" + "accounting_method": "Accrual", } - LOGGER.info(f"Fetch BalanceSheet for period {params['start_date']} to {params['end_date']}") - resp = self._get(report_entity='BalanceSheet', params=params) + LOGGER.info( + f"Fetch BalanceSheet for period {params['start_date']} to {params['end_date']}" + ) + resp = self._get(report_entity="BalanceSheet", params=params) # Get column metadata. columns = self._get_column_metadata(resp) @@ -150,7 +170,9 @@ def sync(self, catalog_entry): cleansed_row.update({k: v}) cleansed_row["Total"] = float(row.get("Total")) - cleansed_row["SyncTimestampUtc"] = singer.utils.strftime(singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ") + cleansed_row["SyncTimestampUtc"] = singer.utils.strftime( + singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ" + ) yield cleansed_row diff --git a/tap_quickbooks/quickbooks/reportstreams/MonthlyBalanceSheetReport.py b/tap_quickbooks/quickbooks/reportstreams/MonthlyBalanceSheetReport.py index 70a94aa..464bd4c 100644 --- a/tap_quickbooks/quickbooks/reportstreams/MonthlyBalanceSheetReport.py +++ b/tap_quickbooks/quickbooks/reportstreams/MonthlyBalanceSheetReport.py @@ -9,16 +9,18 @@ LOGGER = singer.get_logger() NUMBER_OF_PERIODS = 3 + class MonthlyBalanceSheetReport(QuickbooksStream): - tap_stream_id: ClassVar[str] = 'MonthlyBalanceSheetReport' - stream: ClassVar[str] = 'MonthlyBalanceSheetReport' + tap_stream_id: ClassVar[str] = "MonthlyBalanceSheetReport" + stream: ClassVar[str] = "MonthlyBalanceSheetReport" key_properties: ClassVar[List[str]] = [] - replication_method: ClassVar[str] = 'FULL_TABLE' + replication_method: ClassVar[str] = "FULL_TABLE" - def __init__(self, qb, start_date, state_passed): + def __init__(self, qb, start_date, state_passed, pnl_adjusted_gain_loss=None): self.qb = qb self.start_date = start_date self.state_passed = state_passed + self.pnl_adjusted_gain_loss = pnl_adjusted_gain_loss def _get_column_metadata(self, resp): columns = [] @@ -33,8 +35,9 @@ def _get_column_metadata(self, resp): return columns def _recursive_row_search(self, row, output, categories): + header = None row_group = row.get("Rows") - if 'ColData' in list(row.keys()): + if "ColData" in list(row.keys()): # Write the row data = row.get("ColData") values = [column.get("value") for column in data] @@ -45,19 +48,30 @@ def _recursive_row_search(self, row, output, categories): elif row_group is None or row_group == {}: pass else: - row_array = row_group.get("Row") + # row_array = row_group.get("Row") header = row.get("Header") if header is not None: categories.append(header.get("ColData")[0].get("value")) - for row in row_array: - self._recursive_row_search(row, output, categories) + for key, row_item in row.items(): + if isinstance(row_item, str): + continue + if "ColData" in list(row_item.keys()): + self._recursive_row_search(row_item, output, categories) + elif "Row" in list(row_item.keys()): + for sub_row in row_item["Row"]: + self._recursive_row_search(sub_row, output, categories) + elif isinstance(row_item.get(key), dict): + if key in row_item: + self._recursive_row_search( + row_item[key], output, categories + ) if header is not None: categories.pop() def sync(self, catalog_entry): full_sync = not self.state_passed - if full_sync: + if full_sync or self.qb.monthly_balance_sheet_full_sync: LOGGER.info(f"Starting full sync of MonthylBalanceSheet") end_date = datetime.date.today() start_date = self.start_date @@ -65,11 +79,16 @@ def sync(self, catalog_entry): "start_date": start_date.strftime("%Y-%m-%d"), "end_date": end_date.strftime("%Y-%m-%d"), "accounting_method": "Accrual", - "summarize_column_by": "Month" + "summarize_column_by": "Month", } - LOGGER.info(f"Fetch MonthlyBalanceSheet Report for period {params['start_date']} to {params['end_date']}") - resp = self._get(report_entity='BalanceSheet', params=params) + if self.pnl_adjusted_gain_loss: + params.update({"adjusted_gain_loss": "true"}) + + LOGGER.info( + f"Fetch MonthlyBalanceSheet Report for period {params['start_date']} to {params['end_date']}" + ) + resp = self._get(report_entity="BalanceSheet", params=params) # Get column metadata. columns = self._get_column_metadata(resp) @@ -100,17 +119,18 @@ def sync(self, catalog_entry): else: cleansed_row.update({k: v}) - cleansed_row["SyncTimestampUtc"] = singer.utils.strftime(singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ") monthly_total = [] - for key,value in cleansed_row.items(): - if key not in ['Account', 'Categories', 'SyncTimestampUtc']: - monthly_total.append({key:value}) - cleansed_row['MonthlyTotal'] = monthly_total + for key, value in cleansed_row.items(): + if key not in ["Account", "Categories", "SyncTimestampUtc"]: + monthly_total.append({key: value}) + cleansed_row["MonthlyTotal"] = monthly_total yield cleansed_row else: - LOGGER.info(f"Syncing MonthlyBalanceSheet of last {NUMBER_OF_PERIODS} periods") + LOGGER.info( + f"Syncing MonthlyBalanceSheet of last {NUMBER_OF_PERIODS} periods" + ) end_date = datetime.date.today() for i in range(NUMBER_OF_PERIODS): @@ -119,11 +139,13 @@ def sync(self, catalog_entry): "start_date": start_date.strftime("%Y-%m-%d"), "end_date": end_date.strftime("%Y-%m-%d"), "accounting_method": "Accrual", - "summarize_column_by": "Month" + "summarize_column_by": "Month", } - LOGGER.info(f"Fetch MonthlyBalanceSheet for period {params['start_date']} to {params['end_date']}") - resp = self._get(report_entity='BalanceSheet', params=params) + LOGGER.info( + f"Fetch MonthlyBalanceSheet for period {params['start_date']} to {params['end_date']}" + ) + resp = self._get(report_entity="BalanceSheet", params=params) # Get column metadata. columns = self._get_column_metadata(resp) @@ -151,13 +173,13 @@ def sync(self, catalog_entry): continue else: cleansed_row.update({k: v}) - + cleansed_row["SyncTimestampUtc"] = singer.utils.strftime(singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ") monthly_total = [] - for key,value in cleansed_row.items(): - if key not in ['Account', 'Categories', 'SyncTimestampUtc']: - monthly_total.append({key:value}) - cleansed_row['MonthlyTotal'] = monthly_total + for key, value in cleansed_row.items(): + if key not in ["Account", "Categories", "SyncTimestampUtc"]: + monthly_total.append({key: value}) + cleansed_row["MonthlyTotal"] = monthly_total yield cleansed_row diff --git a/tap_quickbooks/quickbooks/reportstreams/ProfitAndLossDetailReport.py b/tap_quickbooks/quickbooks/reportstreams/ProfitAndLossDetailReport.py index a2267e4..511b795 100644 --- a/tap_quickbooks/quickbooks/reportstreams/ProfitAndLossDetailReport.py +++ b/tap_quickbooks/quickbooks/reportstreams/ProfitAndLossDetailReport.py @@ -6,21 +6,32 @@ from tap_quickbooks.quickbooks.rest_reports import QuickbooksStream from tap_quickbooks.sync import transform_data_hook from dateutil.parser import parse +import calendar LOGGER = singer.get_logger() NUMBER_OF_PERIODS = 3 + class ProfitAndLossDetailReport(QuickbooksStream): - tap_stream_id: ClassVar[str] = 'ProfitAndLossDetailReport' - stream: ClassVar[str] = 'ProfitAndLossDetailReport' + tap_stream_id: ClassVar[str] = "ProfitAndLossDetailReport" + stream: ClassVar[str] = "ProfitAndLossDetailReport" key_properties: ClassVar[List[str]] = [] - replication_method: ClassVar[str] = 'FULL_TABLE' + replication_method: ClassVar[str] = "FULL_TABLE" current_account = {} - def __init__(self, qb, start_date, state_passed): + def __init__( + self, + qb, + start_date, + state_passed, + pnl_adjusted_gain_loss=None, + pnl_monthly=None, + ): self.qb = qb self.start_date = start_date self.state_passed = state_passed + self.pnl_adjusted_gain_loss = pnl_adjusted_gain_loss + self.pnl_monthly = pnl_monthly def _get_column_metadata(self, resp): columns = [] @@ -34,11 +45,11 @@ def _get_column_metadata(self, resp): def _recursive_row_search(self, row, output, categories): row_group = row.get("Rows") - if row.get("type")=="Section": + if row.get("type") == "Section": if row.get("Header", {}).get("ColData", [{}]): if row.get("Header", {}).get("ColData", [{}])[0].get("id"): self.current_account = row.get("Header", {}).get("ColData", [{}])[0] - if 'ColData' in list(row.keys()): + if "ColData" in list(row.keys()): # Write the row data = row.get("ColData") values = [column for column in data] @@ -60,6 +71,10 @@ def _recursive_row_search(self, row, output, categories): if header is not None: categories.pop() + def get_days_in_month(self, start_date): + _, days_in_month = calendar.monthrange(start_date.year, start_date.month) + return days_in_month - 1 + def sync(self, catalog_entry): full_sync = not self.state_passed @@ -101,26 +116,38 @@ def sync(self, catalog_entry): "home_net_amount", ] - if full_sync: + if full_sync or self.qb.pl_detail_full_sync: start_date = self.start_date.date() delta = 30 - while start_datedatetime.date.today(): + end_date = start_date + datetime.timedelta(delta) + if end_date > datetime.date.today(): end_date = datetime.date.today() params = { "start_date": start_date.strftime("%Y-%m-%d"), "end_date": end_date.strftime("%Y-%m-%d"), "accounting_method": "Accrual", - "columns": ",".join(cols) + "columns": ",".join(cols), } - - LOGGER.info(f"Fetch Journal Report for period {params['start_date']} to {params['end_date']}") - resp = self._get(report_entity='ProfitAndLossDetail', params=params) + if self.pnl_adjusted_gain_loss: + params.update({"adjusted_gain_loss": "true"}) + # Don't send columns with this param + del params["columns"] + + LOGGER.info( + f"Fetch Journal Report for period {params['start_date']} to {params['end_date']}" + ) + LOGGER.info(f"Fetch Report with params {params}") + resp = self._get(report_entity="ProfitAndLossDetail", params=params) start_date = end_date + datetime.timedelta(1) + if self.pnl_monthly: + delta = self.get_days_in_month(start_date) # Get column metadata. columns = self._get_column_metadata(resp) @@ -153,17 +180,37 @@ def sync(self, catalog_entry): cleansed_row[f"{k}Id"] = v.get("id") else: cleansed_row[k] = v - - cleansed_row["Amount"] = float(cleansed_row.get("Amount")) if cleansed_row.get("Amount") else None - cleansed_row["Balance"] = float(cleansed_row.get("Balance")) if cleansed_row.get("Amount") else None - cleansed_row["SyncTimestampUtc"] = singer.utils.strftime(singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ") - if cleansed_row.get('Date'): + try: + cleansed_row["Amount"] = ( + float(cleansed_row.get("Amount")) + if cleansed_row.get("Amount") + else None + ) + except: + cleansed_row["Amount"] = None + try: + cleansed_row["Balance"] = ( + float(cleansed_row.get("Balance")) + if cleansed_row.get("Amount") + else None + ) + except: + cleansed_row["Balance"] = None + cleansed_row["SyncTimestampUtc"] = singer.utils.strftime( + singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ" + ) + if cleansed_row.get("Date"): try: - cleansed_row["Date"] = parse(cleansed_row['Date']) + cleansed_row["Date"] = parse(cleansed_row["Date"]) except: - continue + if "Unrealized" in cleansed_row["Date"]: + cleansed_row["TransactionType"] = cleansed_row["Date"] + cleansed_row["Date"] = end_date + else: + continue yield cleansed_row + else: LOGGER.info(f"Syncing P&L of last {NUMBER_OF_PERIODS} periods") end_date = datetime.date.today() @@ -174,14 +221,21 @@ def sync(self, catalog_entry): "start_date": start_date.strftime("%Y-%m-%d"), "end_date": end_date.strftime("%Y-%m-%d"), "accounting_method": "Accrual", - "columns": ",".join(cols) + "columns": ",".join(cols), } + if self.pnl_adjusted_gain_loss: + params.update({"adjusted_gain_loss": "true"}) + # Don't send columns with this param + del params["columns"] - LOGGER.info(f"Fetch Journal Report for period {params['start_date']} to {params['end_date']}") - resp = self._get(report_entity='ProfitAndLossDetail', params=params) + LOGGER.info( + f"Fetch Journal Report for period {params['start_date']} to {params['end_date']}" + ) + resp = self._get(report_entity="ProfitAndLossDetail", params=params) # Get column metadata. columns = self._get_column_metadata(resp) + columns += ["Account"] # Recursively get row data. row_group = resp.get("Rows") @@ -213,11 +267,29 @@ def sync(self, catalog_entry): else: cleansed_row[k] = v - cleansed_row["Amount"] = float(cleansed_row.get("Amount")) if cleansed_row.get("Amount") else None - cleansed_row["Balance"] = float(cleansed_row.get("Balance")) if cleansed_row.get("Amount") else None - cleansed_row["SyncTimestampUtc"] = singer.utils.strftime(singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ") - if cleansed_row.get('Date'): - cleansed_row["Date"] = parse(cleansed_row['Date']) + cleansed_row["Amount"] = ( + float(cleansed_row.get("Amount")) + if cleansed_row.get("Amount") + else None + ) + cleansed_row["Balance"] = ( + float(cleansed_row.get("Balance")) + if cleansed_row.get("Amount") + else None + ) + cleansed_row["SyncTimestampUtc"] = singer.utils.strftime( + singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ" + ) + + if cleansed_row.get("Date"): + try: + cleansed_row["Date"] = parse(cleansed_row["Date"]) + except: + if "Unrealized" in cleansed_row["Date"]: + cleansed_row["TransactionType"] = cleansed_row["Date"] + cleansed_row["Date"] = end_date + else: + continue yield cleansed_row diff --git a/tap_quickbooks/quickbooks/rest.py b/tap_quickbooks/quickbooks/rest.py index 582f9f7..90d38ad 100644 --- a/tap_quickbooks/quickbooks/rest.py +++ b/tap_quickbooks/quickbooks/rest.py @@ -63,17 +63,18 @@ def _query_recur( yield record except HTTPError as ex: - response = ex.response.json() - if isinstance(response, list) and response[0].get("errorCode") == "QUERY_TIMEOUT": - start_date = singer_utils.strptime_with_tz(start_date_str) - day_range = (end_date - start_date).days - LOGGER.info( - "Quickbooks returned QUERY_TIMEOUT querying %d days of %s", - day_range, - catalog_entry['stream']) - retryable = True - else: - raise ex + try: + response = ex.response.json() + if isinstance(response, list) and response[0].get("errorCode") == "QUERY_TIMEOUT": + start_date = singer_utils.strptime_with_tz(start_date_str) + day_range = (end_date - start_date).days + LOGGER.info( + "Quickbooks returned QUERY_TIMEOUT querying %d days of %s", + day_range, + catalog_entry['stream']) + retryable = True + except: + raise ex if retryable: start_date = singer_utils.strptime_with_tz(start_date_str) @@ -124,6 +125,9 @@ def _sync_records(self, url, headers, params, stream): # Establish number of records returned. count = resp_json['QueryResponse'].get('maxResults', 0) + LOGGER.info( + f"Synced {count} records for URL: {resp.request.url}" + ) # Make sure there is alteast one record. if count == 0: @@ -140,4 +144,4 @@ def _sync_records(self, url, headers, params, stream): if count < max: break; - offset = (max * page) + 1 \ No newline at end of file + offset = (max * page) + 1 diff --git a/tap_quickbooks/quickbooks/schemas/object_definition.json b/tap_quickbooks/quickbooks/schemas/object_definition.json index 0159001..cce46a8 100644 --- a/tap_quickbooks/quickbooks/schemas/object_definition.json +++ b/tap_quickbooks/quickbooks/schemas/object_definition.json @@ -505,7 +505,9 @@ {"name": "KlassId", "type": "string"}, {"name": "Class", "type": "string"}, {"name": "ClassId", "type": "string"}, - {"name": "Categories", "type": "array", "child_type": "string"} + {"name": "Categories", "type": "array", "child_type": "string"}, + {"name": "SupplierId", "type": "string"}, + {"name": "Supplier", "type": "string"} ], "GeneralLedgerCashReport": [ {"name": "Date", "type": "string"}, @@ -561,7 +563,19 @@ {"name": "31-60", "type": "number"}, {"name": "61-90", "type": "number"}, {"name": "91andover", "type": "number"}, - {"name": "Total", "type": "number"} + {"name": "Total", "type": "number"}, + {"name": "report_date", "type": "string"} + ], + "ARAgingDetailReport": [ + {"name": "Date", "type": "string"}, + {"name": "TransactionType", "type": "string"}, + {"name": "Num", "type": "string"}, + {"name": "No.", "type": "string"}, + {"name": "Customer", "type": "string"}, + {"name": "DueDate", "type": "string"}, + {"name": "Amount", "type": "number"}, + {"name": "OpenBalance", "type": "number"}, + {"name": "report_date", "type": "string"} ], "ProfitAndLossDetailReport": [ {"name": "Date", "type": "string"},