diff --git a/pyproject.toml b/pyproject.toml index 35af3fe..6d60dea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ python = "<3.11,>=3.7.1" requests = "^2.25.1" singer-sdk = "^0.4.4" "backports.cached-property" = "^1.0.1" +curlify = "^2.2.1" [tool.poetry.dev-dependencies] pytest = "^6.2.5" diff --git a/tap_hubspot_beta/client_base.py b/tap_hubspot_beta/client_base.py index a51c751..937847c 100644 --- a/tap_hubspot_beta/client_base.py +++ b/tap_hubspot_beta/client_base.py @@ -13,6 +13,7 @@ from urllib3.exceptions import ProtocolError from singer_sdk.mapper import SameRecordTransform, StreamMap from singer_sdk.helpers._flattening import get_flattening_options +import curlify from pendulum import parse @@ -146,14 +147,20 @@ def validate_response(self, response: requests.Response) -> None: f"{response.status_code} Server Error: " f"{response.reason} for path: {self.path}" ) - raise RetriableAPIError(msg) + curl_command = curlify.to_curl(response.request) + logging.error(f"Response code: {response.status_code}, info: {response.text}") + logging.error(f"CURL command for failed request: {curl_command}") + raise RetriableAPIError(f"Msg {msg}, response {response.text}") elif 400 <= response.status_code < 500: msg = ( f"{response.status_code} Client Error: " f"{response.reason} for path: {self.path}" ) - raise FatalAPIError(msg) + curl_command = curlify.to_curl(response.request) + logging.error(f"Response code: {response.status_code}, info: {response.text}") + logging.error(f"CURL command for failed request: {curl_command}") + raise FatalAPIError(f"Msg {msg}, response {response.text}") @staticmethod def extract_type(field): diff --git a/tap_hubspot_beta/client_v3.py b/tap_hubspot_beta/client_v3.py index 71bd9f1..336f4f0 100644 --- a/tap_hubspot_beta/client_v3.py +++ b/tap_hubspot_beta/client_v3.py @@ -38,6 +38,10 @@ def get_next_page_token( ) -> Optional[Any]: """Return a token for identifying next page or None if no more pages.""" all_matches = extract_jsonpath(self.next_page_token_jsonpath, response.json()) + + if not previous_token: + self.logger.info(f"Total records to fetch for stream = {self.name}: {response.json().get('total')}") + next_page_token = next(iter(all_matches), None) if next_page_token == "10000": start_date = self.stream_state.get("progress_markers", {}).get( diff --git a/tap_hubspot_beta/streams.py b/tap_hubspot_beta/streams.py index 5de5170..d263338 100644 --- a/tap_hubspot_beta/streams.py +++ b/tap_hubspot_beta/streams.py @@ -20,10 +20,14 @@ from tap_hubspot_beta.client_v4 import hubspotV4Stream import time import pytz -from singer_sdk.helpers._state import log_sort_error from pendulum import parse from urllib.parse import urlencode +from singer_sdk.helpers._state import ( + finalize_state_progress_markers, + log_sort_error +) + association_schema = th.PropertiesList( th.Property("from_id", th.StringType), th.Property("to_id", th.StringType), @@ -882,7 +886,6 @@ class TicketsStream(ObjectSearchV3): replication_key_filter = "hs_lastmodifieddate" properties_url = "properties/v2/tickets/properties" - class DealsStream(ObjectSearchV3): """Deals Stream""" @@ -894,14 +897,101 @@ class DealsStream(ObjectSearchV3): def get_child_context(self, record: dict, context) -> dict: return {"id": record["id"]} -class DealsAssociationParent(DealsStream): - name = "deals_association_parent" + +class DealsAssociationParent(hubspotV1Stream): + name = "deals_association_parent" + path = "deals/v1/deal/paged" replication_key = None primary_keys = ["id"] + + records_jsonpath = "$.deals[*]" + schema = th.PropertiesList( th.Property("id", th.StringType), ).to_dict() + def post_process(self, row, context): + row = super().post_process(row, context) + row["id"] = str(row["dealId"]) + return row + + def get_child_context(self, record: dict, context) -> dict: + return {"id": record["id"]} + + def _sync_records( # noqa C901 # too complex + self, context: Optional[dict] = None + ) -> None: + """Sync records, emitting RECORD and STATE messages. """ + record_count = 0 + current_context: Optional[dict] + context_list: Optional[List[dict]] + context_list = [context] if context is not None else self.partitions + selected = self.selected + + for current_context in context_list or [{}]: + partition_record_count = 0 + current_context = current_context or None + state = self.get_context_state(current_context) + state_partition_context = self._get_state_partition_context(current_context) + self._write_starting_replication_value(current_context) + child_context: Optional[dict] = ( + None if current_context is None else copy.copy(current_context) + ) + child_context_bulk = {"ids": []} + for record_result in self.get_records(current_context): + if isinstance(record_result, tuple): + # Tuple items should be the record and the child context + record, child_context = record_result + else: + record = record_result + child_context = copy.copy( + self.get_child_context(record=record, context=child_context) + ) + for key, val in (state_partition_context or {}).items(): + # Add state context to records if not already present + if key not in record: + record[key] = val + + # Sync children, except when primary mapper filters out the record + if self.stream_maps[0].get_filter_result(record): + child_context_bulk["ids"].append(child_context) + if len(child_context_bulk["ids"])>=5000: + self._sync_children(child_context_bulk) + child_context_bulk = {"ids": []} + self._check_max_record_limit(record_count) + if selected: + if (record_count - 1) % self.STATE_MSG_FREQUENCY == 0: + self._write_state_message() + self._write_record_message(record) + try: + self._increment_stream_state(record, context=current_context) + except InvalidStreamSortException as ex: + log_sort_error( + log_fn=self.logger.error, + ex=ex, + record_count=record_count + 1, + partition_record_count=partition_record_count + 1, + current_context=current_context, + state_partition_context=state_partition_context, + stream_name=self.name, + ) + raise ex + + record_count += 1 + partition_record_count += 1 + if len(child_context_bulk): + self._sync_children(child_context_bulk) + if current_context == state_partition_context: + # Finalize per-partition state only if 1:1 with context + finalize_state_progress_markers(state) + if not context: + # Finalize total stream only if we have the full full context. + # Otherwise will be finalized by tap at end of sync. + finalize_state_progress_markers(self.stream_state) + self._write_record_count_log(record_count=record_count, context=context) + # Reset interim bookmarks before emitting final STATE message: + self._write_state_message() + class ArchivedDealsStream(hubspotV3Stream): """Archived Deals Stream""" @@ -1212,7 +1302,6 @@ class AssociationDealsStream(hubspotV4Stream): th.Property("associationTypes", th.CustomType({"type": ["array", "object"]})), ).to_dict() - class AssociationContactsStream(hubspotV4Stream): """Association Base Stream""" @@ -1249,14 +1338,12 @@ class AssociationDealsLineItemsStream(AssociationDealsStream): name = "associations_deals_line_items" path = "crm/v4/associations/deals/line_items/batch/read" - class AssociationContactsTicketsStream(AssociationContactsStream): """Association Contacts -> Tickets Stream""" name = "associations_contacts_tickets" path = "crm/v4/associations/contacts/tickets/batch/read" - class AssociationContactsStream(hubspotV4Stream): """Association Base Stream""" @@ -1279,7 +1366,6 @@ class AssociationContactsCompaniesStream(AssociationContactsStream): name = "associations_contacts_companies" path = "crm/v4/associations/contacts/companies/batch/read" - class MarketingEmailsStream(hubspotV1Stream): """Dispositions Stream""" @@ -1383,7 +1469,6 @@ class MarketingEmailsStream(hubspotV1Stream): th.Property("vidsIncluded", th.CustomType({"type": ["array", "string"]})), ).to_dict() - class PostalMailStream(ObjectSearchV3): """Owners Stream""" @@ -1448,7 +1533,6 @@ class QuotesStream(ObjectSearchV3): replication_key_filter = "hs_lastmodifieddate" properties_url = "properties/v2/quotes/properties" - class AssociationQuotesDealsStream(AssociationDealsStream): """Association Quotes -> Deals Stream""" @@ -1457,7 +1541,7 @@ class AssociationQuotesDealsStream(AssociationDealsStream): class CurrenciesStream(hubspotV3Stream): - """Owners Stream""" + """Currencies Stream""" name = "currencies_exchange_rate" path = "settings/v3/currencies/exchange-rates" @@ -1689,4 +1773,4 @@ class AssociationTasksDealsStream(AssociationTasksStream): """Association Tasks -> Deals Stream""" name = "associations_tasks_deals" - path = "crm/v4/associations/tasks/deals/batch/read" \ No newline at end of file + path = "crm/v4/associations/tasks/deals/batch/read" diff --git a/tap_hubspot_beta/tap.py b/tap_hubspot_beta/tap.py index aca02ee..1fbab36 100644 --- a/tap_hubspot_beta/tap.py +++ b/tap_hubspot_beta/tap.py @@ -176,15 +176,14 @@ def catalog_dict(self) -> dict: The tap's catalog as a dict """ catalog = super().catalog_dict - if self.config.get("catalog_metadata", False): - streams = self.streams - for stream in catalog["streams"]: - stream_class = streams[stream["tap_stream_id"]] - stream["stream_meta"] = {} - if hasattr(stream_class, "load_fields_metadata"): - stream_class.load_fields_metadata() - for field in stream["schema"]["properties"]: - stream["schema"]["properties"][field]["field_meta"] = stream_class.fields_metadata.get(field, {}) + streams = self.streams + for stream in catalog["streams"]: + stream_class = streams[stream["tap_stream_id"]] + stream["stream_meta"] = {} + if hasattr(stream_class, "load_fields_metadata") and stream["stream"] in ["deals", "lineitems", "contacts", "companies"]: + stream_class.load_fields_metadata() + for field in stream["schema"]["properties"]: + stream["schema"]["properties"][field]["field_meta"] = stream_class.fields_metadata.get(field, {}) return catalog