From 4b44d584bffa72043df5da8b71d7bcc046732107 Mon Sep 17 00:00:00 2001 From: Vinicius Mesel <4984147+vmesel@users.noreply.github.com> Date: Mon, 6 Nov 2023 13:51:45 -0300 Subject: [PATCH 01/16] Adds support for field_meta/stream_meta on the catalog.json --- tap_hubspot_beta/client_base.py | 21 +++++++++++++++++++++ tap_hubspot_beta/streams.py | 7 +++++-- tap_hubspot_beta/tap.py | 18 ++++++++++++++++++ 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/tap_hubspot_beta/client_base.py b/tap_hubspot_beta/client_base.py index c915b90..d64411a 100644 --- a/tap_hubspot_beta/client_base.py +++ b/tap_hubspot_beta/client_base.py @@ -28,6 +28,27 @@ class hubspotStream(RESTStream): properties_url = None page_size = 100 + stream_metadata = {} + fields_metadata = {} + object_type = None + fields_metadata = {} + + def load_fields_metadata(self): + if not self.properties_url: + self.logger.info(f"Skipping fields_meta for {self.name} stream, because there is no properties_url set") + return + + req = requests.get( + f"{self.url_base}{self.properties_url}", + headers = self.authenticator.auth_headers or {}, + ) + + if req.status_code != 200: + self.logger.info(f"Skipping fields_meta for {self.name} stream") + return + + self.fields_metadata = {v["name"]: v for v in req.json()} + def _request( self, prepared_request: requests.PreparedRequest, context: Optional[dict] ) -> requests.Response: diff --git a/tap_hubspot_beta/streams.py b/tap_hubspot_beta/streams.py index db19347..ba5436a 100644 --- a/tap_hubspot_beta/streams.py +++ b/tap_hubspot_beta/streams.py @@ -60,6 +60,7 @@ class EngagementStream(hubspotV1Stream): primary_keys = ["id"] replication_key = None page_size = 250 + properties_url = "properties/v2/engagements/properties" schema = th.PropertiesList( th.Property("id", th.IntegerType), @@ -539,6 +540,7 @@ class FormSubmissionsStream(hubspotV1Stream): # NOTE: There is no primary_key for this stream replication_key = "submittedAt" path = "/form-integrations/v1/submissions/forms/{form_id}" + properties_url = "properties/v2/form_submissions/properties" schema = th.PropertiesList( th.Property("form_id", th.StringType), @@ -765,6 +767,7 @@ class CompaniesStream(ObjectSearchV3): """Companies Stream""" name = "companies" + object_type = "companies" path = "crm/v3/objects/companies/search" replication_key_filter = "hs_lastmodifieddate" properties_url = "properties/v1/companies/properties" @@ -1100,7 +1103,7 @@ class PostalMailStream(ObjectSearchV3): path = "crm/v3/objects/postal_mail/search" primary_keys = ["id"] replication_key_filter = "hs_lastmodifieddate" - properties_url = "properties/v1/postal_mail/properties" + properties_url = "properties/v2/postal_mail/properties" schema = th.PropertiesList( th.Property("id", th.StringType), @@ -1123,7 +1126,7 @@ class CommunicationsStream(ObjectSearchV3): path = "crm/v3/objects/communications/search" primary_keys = ["id"] replication_key_filter = "hs_lastmodifieddate" - properties_url = "properties/v1/communications/properties" + properties_url = "properties/v2/communications/properties" schema = th.PropertiesList( th.Property("id", th.StringType), diff --git a/tap_hubspot_beta/tap.py b/tap_hubspot_beta/tap.py index 5f55e52..1dbf91b 100644 --- a/tap_hubspot_beta/tap.py +++ b/tap_hubspot_beta/tap.py @@ -116,6 +116,24 @@ def discover_streams(self) -> List[Stream]: """Return a list of discovered streams.""" return [stream_class(tap=self) for stream_class in STREAM_TYPES] + @property + def catalog_dict(self) -> dict: + """Get catalog dictionary. + + Returns: + The tap's catalog as a dict + """ + catalog = super().catalog_dict + streams = self.streams + for stream in catalog["streams"]: + stream_class = streams[stream["tap_stream_id"]] + stream["stream_meta"] = {} + if hasattr(stream_class, "load_fields_metadata"): + stream_class.load_fields_metadata() + for field in stream["schema"]["properties"]: + stream["schema"]["properties"][field]["field_meta"] = stream_class.fields_metadata.get(field, {}) + return catalog + if __name__ == "__main__": Taphubspot.cli() From a760de6e83fc7b0842bdaa2f8fb528133c8c51e1 Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Thu, 16 Nov 2023 09:43:57 -0500 Subject: [PATCH 02/16] Handle case where lineitems_archived is not in catalog --- tap_hubspot_beta/streams.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tap_hubspot_beta/streams.py b/tap_hubspot_beta/streams.py index a817ddf..2ea0841 100644 --- a/tap_hubspot_beta/streams.py +++ b/tap_hubspot_beta/streams.py @@ -886,6 +886,10 @@ def selected(self) -> bool: Returns: True if the stream is selected. """ + # It has to be in the catalog or it will cause issues + if not self._tap.catalog.get("lineitems_archived"): + return False + try: # Make this stream auto-select if lineitems is selected self._tap.catalog["lineitems_archived"] = self._tap.catalog["lineitems"] From d4d97c1ac02bad377f107d3ddf1abcae32abb40e Mon Sep 17 00:00:00 2001 From: Davi Souza Date: Fri, 15 Dec 2023 20:41:30 -0300 Subject: [PATCH 03/16] Revert "Merge branch 'master' into feature/hgi-4472" This reverts commit 7b9a09b92e262ffa17dbd36b9cfc5064b24379b1, reversing changes made to a63dddf562292463262c9f1ef0cfbec72d56adc9. --- tap_hubspot_beta/client_base.py | 38 -------- tap_hubspot_beta/client_v1.py | 7 +- tap_hubspot_beta/client_v3.py | 33 +------ tap_hubspot_beta/streams.py | 158 +------------------------------- tap_hubspot_beta/tap.py | 27 ++---- 5 files changed, 18 insertions(+), 245 deletions(-) diff --git a/tap_hubspot_beta/client_base.py b/tap_hubspot_beta/client_base.py index 81f6fd6..d64411a 100644 --- a/tap_hubspot_beta/client_base.py +++ b/tap_hubspot_beta/client_base.py @@ -11,8 +11,6 @@ from singer_sdk.exceptions import FatalAPIError, RetriableAPIError from singer_sdk.streams import RESTStream from urllib3.exceptions import ProtocolError -from singer_sdk.mapper import SameRecordTransform, StreamMap -from singer_sdk.helpers._flattening import get_flattening_options from pendulum import parse @@ -264,42 +262,6 @@ def request_decorator(self, func): on_backoff=self.backoff_handler, )(func) return decorator - - @property - def stream_maps(self) -> List[StreamMap]: - """Get stream transformation maps. - - The 0th item is the primary stream map. List should not be empty. - - Returns: - A list of one or more map transformations for this stream. - """ - if self._stream_maps: - return self._stream_maps - - if self._tap.mapper: - #Append deals association stream if it is not in the catalog. - if self.name == "deals_association_parent" and self.name not in self._tap.mapper.stream_maps: - self._tap.mapper.stream_maps.update({"deals_association_parent":self._tap.mapper.stream_maps["deals"]}) - self._tap.mapper.stream_maps["deals_association_parent"][0].stream_alias = "deals_association_parent" - self._stream_maps = self._tap.mapper.stream_maps[self.name] - self.logger.info( - f"Tap has custom mapper. Using {len(self.stream_maps)} provided map(s)." - ) - else: - self.logger.info( - f"No custom mapper provided for '{self.name}'. " - "Using SameRecordTransform." - ) - self._stream_maps = [ - SameRecordTransform( - stream_alias=self.name, - raw_schema=self.schema, - key_properties=self.primary_keys, - flattening_options=get_flattening_options(self.config), - ) - ] - return self._stream_maps class hubspotStreamSchema(hubspotStream): diff --git a/tap_hubspot_beta/client_v1.py b/tap_hubspot_beta/client_v1.py index 4f6aa1d..36d6203 100644 --- a/tap_hubspot_beta/client_v1.py +++ b/tap_hubspot_beta/client_v1.py @@ -56,10 +56,9 @@ def get_url_params( def post_process(self, row: dict, context: Optional[dict]) -> dict: """As needed, append or transform raw data to match expected structure.""" if self.properties_url: - if row.get("properties"): - for name, value in row.get("properties", {}).items(): - row[name] = value.get("value") - del row["properties"] + for name, value in row["properties"].items(): + row[name] = value.get("value") + del row["properties"] for field in self.datetime_fields: if row.get(field) is not None: if row.get(field) in [0, ""]: diff --git a/tap_hubspot_beta/client_v3.py b/tap_hubspot_beta/client_v3.py index 6c49b83..feda1f3 100644 --- a/tap_hubspot_beta/client_v3.py +++ b/tap_hubspot_beta/client_v3.py @@ -26,7 +26,6 @@ class hubspotV3SearchStream(hubspotStream): filter = None starting_time = None page_size = 100 - special_replication = False def get_starting_time(self, context): start_date = self.get_starting_timestamp(context) @@ -39,29 +38,8 @@ def get_next_page_token( """Return a token for identifying next page or None if no more pages.""" all_matches = extract_jsonpath(self.next_page_token_jsonpath, response.json()) next_page_token = next(iter(all_matches), None) - if next_page_token == "10000": - start_date = self.stream_state.get("progress_markers", {}).get( - "replication_key_value" - ) - if self.name in ["deals_association_parent"]: - data = response.json() - # extract maximum modified date to overcome 10000 pagination limit - hs_lastmodifieddates = [ - entry["properties"]["hs_lastmodifieddate"] - for entry in data["results"] - if "properties" in entry - and "hs_lastmodifieddate" in entry["properties"] - ] - hs_lastmodifieddates = [ - date for date in hs_lastmodifieddates if date is not None - ] - max_date = max(hs_lastmodifieddates) if hs_lastmodifieddates else None - if max_date: - start_date = max_date - self.special_replication = True - else: - return None - + if next_page_token=="10000": + start_date = self.stream_state.get("progress_markers", {}).get("replication_key_value") if start_date: start_date = parse(start_date) self.starting_time = int(start_date.timestamp() * 1000) @@ -80,7 +58,7 @@ def prepare_request_payload( payload["filters"].append(self.filter) if next_page_token and next_page_token!="0": payload["after"] = next_page_token - if self.replication_key and starting_time or self.special_replication: + if self.replication_key and starting_time: payload["filters"].append( { "propertyName": self.replication_key_filter, @@ -93,10 +71,7 @@ def prepare_request_payload( "direction": "ASCENDING" }] if self.properties_url: - if self.name =="deals_association_parent": - payload["properties"] = ["id"] - else: - payload["properties"] = self.selected_properties + payload["properties"] = self.selected_properties else: payload["properties"] = [] return payload diff --git a/tap_hubspot_beta/streams.py b/tap_hubspot_beta/streams.py index 607cab6..3fb15f4 100644 --- a/tap_hubspot_beta/streams.py +++ b/tap_hubspot_beta/streams.py @@ -780,72 +780,6 @@ class CompaniesStream(ObjectSearchV3): replication_key_filter = "hs_lastmodifieddate" properties_url = "properties/v1/companies/properties" - -class ArchivedCompaniesStream(hubspotV3Stream): - """Archived Companies Stream""" - - name = "companies_archived" - replication_key = "archivedAt" - path = "crm/v3/objects/companies?archived=true" - properties_url = "properties/v1/companies/properties" - primary_keys = ["id"] - - base_properties = [ - th.Property("id", th.StringType), - th.Property("archived", th.BooleanType), - th.Property("archivedAt", th.DateTimeType), - th.Property("createdAt", th.DateTimeType), - th.Property("updatedAt", th.DateTimeType) - ] - - @property - def selected(self) -> bool: - """Check if stream is selected. - - Returns: - True if the stream is selected. - """ - # It has to be in the catalog or it will cause issues - if not self._tap.catalog.get("companies_archived"): - return False - - try: - # Make this stream auto-select if companies is selected - self._tap.catalog["companies_archived"] = self._tap.catalog["companies"] - return self.mask.get((), False) or self._tap.catalog["companies"].metadata.get(()).selected - except: - return self.mask.get((), False) - - def _write_record_message(self, record: dict) -> None: - """Write out a RECORD message. - - Args: - record: A single stream record. - """ - for record_message in self._generate_record_messages(record): - # force this to think it's the companies stream - record_message.stream = "companies" - singer.write_message(record_message) - - @property - def metadata(self): - new_metadata = super().metadata - new_metadata[("properties", "archivedAt")].selected = True - new_metadata[("properties", "archivedAt")].selected_by_default = True - return new_metadata - - def post_process(self, row, context): - row = super().post_process(row, context) - - rep_key = self.get_starting_timestamp(context).replace(tzinfo=pytz.utc) - archived_at = parse(row['archivedAt']).replace(tzinfo=pytz.utc) - - if archived_at > rep_key: - return row - - return None - - class TicketsStream(ObjectSearchV3): """Companies Stream""" @@ -854,7 +788,6 @@ class TicketsStream(ObjectSearchV3): replication_key_filter = "hs_lastmodifieddate" properties_url = "properties/v2/tickets/properties" - class DealsStream(ObjectSearchV3): """Deals Stream""" @@ -866,79 +799,6 @@ class DealsStream(ObjectSearchV3): def get_child_context(self, record: dict, context) -> dict: return {"id": record["id"]} -class DealsAssociationParent(DealsStream): - name = "deals_association_parent" - replication_key = None - primary_keys = ["id"] - schema = th.PropertiesList( - th.Property("id", th.IntegerType), - ).to_dict() - - -class ArchivedDealsStream(hubspotV3Stream): - """Archived Deals Stream""" - - name = "deals_archived" - replication_key = "archivedAt" - path = "crm/v3/objects/deals?archived=true" - properties_url = "properties/v1/deals/properties" - primary_keys = ["id"] - - base_properties = [ - th.Property("id", th.StringType), - th.Property("archived", th.BooleanType), - th.Property("archivedAt", th.DateTimeType), - th.Property("createdAt", th.DateTimeType), - th.Property("updatedAt", th.DateTimeType) - ] - - @property - def metadata(self): - new_metadata = super().metadata - new_metadata[("properties", "archivedAt")].selected = True - new_metadata[("properties", "archivedAt")].selected_by_default = True - return new_metadata - - @property - def selected(self) -> bool: - """Check if stream is selected. - - Returns: - True if the stream is selected. - """ - # It has to be in the catalog or it will cause issues - if not self._tap.catalog.get("deals_archived"): - return False - - try: - # Make this stream auto-select if deals is selected - self._tap.catalog["deals_archived"] = self._tap.catalog["deals"] - return self.mask.get((), False) or self._tap.catalog["deals"].metadata.get(()).selected - except: - return self.mask.get((), False) - - def _write_record_message(self, record: dict) -> None: - """Write out a RECORD message. - - Args: - record: A single stream record. - """ - for record_message in self._generate_record_messages(record): - # force this to think it's the deals stream - record_message.stream = "deals" - singer.write_message(record_message) - - def post_process(self, row, context): - row = super().post_process(row, context) - - rep_key = self.get_starting_timestamp(context).replace(tzinfo=pytz.utc) - archived_at = parse(row['archivedAt']).replace(tzinfo=pytz.utc) - - if archived_at > rep_key: - return row - - return None - class ProductsStream(ObjectSearchV3): """Products Stream""" @@ -1020,13 +880,6 @@ class ArchivedLineItemsStream(hubspotV3Stream): th.Property("updatedAt", th.DateTimeType) ] - @property - def metadata(self): - new_metadata = super().metadata - new_metadata[("properties", "archivedAt")].selected = True - new_metadata[("properties", "archivedAt")].selected_by_default = True - return new_metadata - @property def selected(self) -> bool: """Check if stream is selected. @@ -1137,7 +990,7 @@ class AssociationDealsStream(hubspotV4Stream): """Association Base Stream""" primary_keys = ["from_id", "to_id"] - parent_stream_type = DealsAssociationParent + parent_stream_type = DealsStream schema = th.PropertiesList( th.Property("from_id", th.StringType), @@ -1148,7 +1001,6 @@ class AssociationDealsStream(hubspotV4Stream): th.Property("associationTypes", th.CustomType({"type": ["array", "object"]})), ).to_dict() - class AssociationContactsStream(hubspotV4Stream): """Association Base Stream""" @@ -1185,14 +1037,12 @@ class AssociationDealsLineItemsStream(AssociationDealsStream): name = "associations_deals_line_items" path = "crm/v4/associations/deals/line_items/batch/read" - class AssociationContactsTicketsStream(AssociationContactsStream): """Association Contacts -> Tickets Stream""" name = "associations_contacts_tickets" path = "crm/v4/associations/contacts/tickets/batch/read" - class AssociationContactsStream(hubspotV4Stream): """Association Base Stream""" @@ -1215,7 +1065,6 @@ class AssociationContactsCompaniesStream(AssociationContactsStream): name = "associations_contacts_companies" path = "crm/v4/associations/contacts/companies/batch/read" - class MarketingEmailsStream(hubspotV1Stream): """Dispositions Stream""" @@ -1319,7 +1168,6 @@ class MarketingEmailsStream(hubspotV1Stream): th.Property("vidsIncluded", th.CustomType({"type": ["array", "string"]})), ).to_dict() - class PostalMailStream(ObjectSearchV3): """Owners Stream""" @@ -1343,8 +1191,6 @@ class PostalMailStream(ObjectSearchV3): th.Property("archived", th.BooleanType), th.Property("associations", th.CustomType({"type": ["object", "array"]})), ).to_dict() - - class CommunicationsStream(ObjectSearchV3): """Owners Stream""" @@ -1369,7 +1215,6 @@ class CommunicationsStream(ObjectSearchV3): th.Property("associations", th.CustomType({"type": ["object", "array"]})), ).to_dict() - class QuotesStream(ObjectSearchV3): """Products Stream""" @@ -1378,7 +1223,6 @@ class QuotesStream(ObjectSearchV3): replication_key_filter = "hs_lastmodifieddate" properties_url = "properties/v2/quotes/properties" - class AssociationQuotesDealsStream(AssociationDealsStream): """Association Quotes -> Deals Stream""" diff --git a/tap_hubspot_beta/tap.py b/tap_hubspot_beta/tap.py index 32621d7..2851637 100644 --- a/tap_hubspot_beta/tap.py +++ b/tap_hubspot_beta/tap.py @@ -43,10 +43,7 @@ QuotesStream, AssociationQuotesDealsStream, ListMembershipV3Stream, - ListSearchV3Stream, - ArchivedCompaniesStream, - ArchivedDealsStream, - DealsAssociationParent + ListSearchV3Stream ) STREAM_TYPES = [ @@ -87,10 +84,7 @@ QuotesStream, AssociationQuotesDealsStream, ListMembershipV3Stream, - ListSearchV3Stream, - ArchivedCompaniesStream, - ArchivedDealsStream, - DealsAssociationParent + ListSearchV3Stream ] @@ -132,15 +126,14 @@ def catalog_dict(self) -> dict: The tap's catalog as a dict """ catalog = super().catalog_dict - if self.config.get("catalog_metadata", False): - streams = self.streams - for stream in catalog["streams"]: - stream_class = streams[stream["tap_stream_id"]] - stream["stream_meta"] = {} - if hasattr(stream_class, "load_fields_metadata"): - stream_class.load_fields_metadata() - for field in stream["schema"]["properties"]: - stream["schema"]["properties"][field]["field_meta"] = stream_class.fields_metadata.get(field, {}) + streams = self.streams + for stream in catalog["streams"]: + stream_class = streams[stream["tap_stream_id"]] + stream["stream_meta"] = {} + if hasattr(stream_class, "load_fields_metadata"): + stream_class.load_fields_metadata() + for field in stream["schema"]["properties"]: + stream["schema"]["properties"][field]["field_meta"] = stream_class.fields_metadata.get(field, {}) return catalog From 5991198d067d984e4b7f67031c2598051c6439cc Mon Sep 17 00:00:00 2001 From: xacadil <92389481+xacadil@users.noreply.github.com> Date: Tue, 5 Dec 2023 23:45:19 +0500 Subject: [PATCH 04/16] Deals association separate stream with full sync (#15) --- tap_hubspot_beta/client_v3.py | 5 ++++- tap_hubspot_beta/streams.py | 10 +++++++++- tap_hubspot_beta/tap.py | 6 ++++-- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/tap_hubspot_beta/client_v3.py b/tap_hubspot_beta/client_v3.py index feda1f3..5ebee8b 100644 --- a/tap_hubspot_beta/client_v3.py +++ b/tap_hubspot_beta/client_v3.py @@ -71,7 +71,10 @@ def prepare_request_payload( "direction": "ASCENDING" }] if self.properties_url: - payload["properties"] = self.selected_properties + if self.name =="deals_association_parent": + payload["properties"] = ["id"] + else: + payload["properties"] = self.selected_properties else: payload["properties"] = [] return payload diff --git a/tap_hubspot_beta/streams.py b/tap_hubspot_beta/streams.py index 3fb15f4..106f86c 100644 --- a/tap_hubspot_beta/streams.py +++ b/tap_hubspot_beta/streams.py @@ -799,6 +799,14 @@ class DealsStream(ObjectSearchV3): def get_child_context(self, record: dict, context) -> dict: return {"id": record["id"]} +class DealsAssociationParent(DealsStream): + name = "deals_association_parent" + replication_key = None + primary_keys = ["id"] + schema = th.PropertiesList( + th.Property("id", th.IntegerType), + ).to_dict() + class ProductsStream(ObjectSearchV3): """Products Stream""" @@ -990,7 +998,7 @@ class AssociationDealsStream(hubspotV4Stream): """Association Base Stream""" primary_keys = ["from_id", "to_id"] - parent_stream_type = DealsStream + parent_stream_type = DealsAssociationParent schema = th.PropertiesList( th.Property("from_id", th.StringType), diff --git a/tap_hubspot_beta/tap.py b/tap_hubspot_beta/tap.py index 2851637..cf6cfcd 100644 --- a/tap_hubspot_beta/tap.py +++ b/tap_hubspot_beta/tap.py @@ -43,7 +43,8 @@ QuotesStream, AssociationQuotesDealsStream, ListMembershipV3Stream, - ListSearchV3Stream + ListSearchV3Stream, + DealsAssociationParent ) STREAM_TYPES = [ @@ -84,7 +85,8 @@ QuotesStream, AssociationQuotesDealsStream, ListMembershipV3Stream, - ListSearchV3Stream + ListSearchV3Stream, + DealsAssociationParent ] From 78f9f23f44125e2f8034363988379d755ef064da Mon Sep 17 00:00:00 2001 From: xacadil <92389481+xacadil@users.noreply.github.com> Date: Wed, 6 Dec 2023 23:56:09 +0500 Subject: [PATCH 05/16] Fix to automatically add deals_association_parent stream if it is missing in the catalog (#16) --- tap_hubspot_beta/client_base.py | 38 +++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tap_hubspot_beta/client_base.py b/tap_hubspot_beta/client_base.py index d64411a..81f6fd6 100644 --- a/tap_hubspot_beta/client_base.py +++ b/tap_hubspot_beta/client_base.py @@ -11,6 +11,8 @@ from singer_sdk.exceptions import FatalAPIError, RetriableAPIError from singer_sdk.streams import RESTStream from urllib3.exceptions import ProtocolError +from singer_sdk.mapper import SameRecordTransform, StreamMap +from singer_sdk.helpers._flattening import get_flattening_options from pendulum import parse @@ -262,6 +264,42 @@ def request_decorator(self, func): on_backoff=self.backoff_handler, )(func) return decorator + + @property + def stream_maps(self) -> List[StreamMap]: + """Get stream transformation maps. + + The 0th item is the primary stream map. List should not be empty. + + Returns: + A list of one or more map transformations for this stream. + """ + if self._stream_maps: + return self._stream_maps + + if self._tap.mapper: + #Append deals association stream if it is not in the catalog. + if self.name == "deals_association_parent" and self.name not in self._tap.mapper.stream_maps: + self._tap.mapper.stream_maps.update({"deals_association_parent":self._tap.mapper.stream_maps["deals"]}) + self._tap.mapper.stream_maps["deals_association_parent"][0].stream_alias = "deals_association_parent" + self._stream_maps = self._tap.mapper.stream_maps[self.name] + self.logger.info( + f"Tap has custom mapper. Using {len(self.stream_maps)} provided map(s)." + ) + else: + self.logger.info( + f"No custom mapper provided for '{self.name}'. " + "Using SameRecordTransform." + ) + self._stream_maps = [ + SameRecordTransform( + stream_alias=self.name, + raw_schema=self.schema, + key_properties=self.primary_keys, + flattening_options=get_flattening_options(self.config), + ) + ] + return self._stream_maps class hubspotStreamSchema(hubspotStream): From 70119f3d42f60f9c7a9da8b3c961b3fb3c69128f Mon Sep 17 00:00:00 2001 From: xacadil <92389481+xacadil@users.noreply.github.com> Date: Sat, 9 Dec 2023 02:30:24 +0500 Subject: [PATCH 06/16] infinite loop fix for non-inremental Deals association parent stream (#17) --- tap_hubspot_beta/client_v3.py | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/tap_hubspot_beta/client_v3.py b/tap_hubspot_beta/client_v3.py index 5ebee8b..6c49b83 100644 --- a/tap_hubspot_beta/client_v3.py +++ b/tap_hubspot_beta/client_v3.py @@ -26,6 +26,7 @@ class hubspotV3SearchStream(hubspotStream): filter = None starting_time = None page_size = 100 + special_replication = False def get_starting_time(self, context): start_date = self.get_starting_timestamp(context) @@ -38,8 +39,29 @@ def get_next_page_token( """Return a token for identifying next page or None if no more pages.""" all_matches = extract_jsonpath(self.next_page_token_jsonpath, response.json()) next_page_token = next(iter(all_matches), None) - if next_page_token=="10000": - start_date = self.stream_state.get("progress_markers", {}).get("replication_key_value") + if next_page_token == "10000": + start_date = self.stream_state.get("progress_markers", {}).get( + "replication_key_value" + ) + if self.name in ["deals_association_parent"]: + data = response.json() + # extract maximum modified date to overcome 10000 pagination limit + hs_lastmodifieddates = [ + entry["properties"]["hs_lastmodifieddate"] + for entry in data["results"] + if "properties" in entry + and "hs_lastmodifieddate" in entry["properties"] + ] + hs_lastmodifieddates = [ + date for date in hs_lastmodifieddates if date is not None + ] + max_date = max(hs_lastmodifieddates) if hs_lastmodifieddates else None + if max_date: + start_date = max_date + self.special_replication = True + else: + return None + if start_date: start_date = parse(start_date) self.starting_time = int(start_date.timestamp() * 1000) @@ -58,7 +80,7 @@ def prepare_request_payload( payload["filters"].append(self.filter) if next_page_token and next_page_token!="0": payload["after"] = next_page_token - if self.replication_key and starting_time: + if self.replication_key and starting_time or self.special_replication: payload["filters"].append( { "propertyName": self.replication_key_filter, From 1b9d13cbce165601918e961afa25ef8d5b3fc57d Mon Sep 17 00:00:00 2001 From: Vinicius Mesel <4984147+vmesel@users.noreply.github.com> Date: Mon, 18 Dec 2023 11:58:50 -0300 Subject: [PATCH 07/16] Restricts which streams generate metadata --- tap_hubspot_beta/tap.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_hubspot_beta/tap.py b/tap_hubspot_beta/tap.py index cf6cfcd..9fa1eb8 100644 --- a/tap_hubspot_beta/tap.py +++ b/tap_hubspot_beta/tap.py @@ -132,7 +132,7 @@ def catalog_dict(self) -> dict: for stream in catalog["streams"]: stream_class = streams[stream["tap_stream_id"]] stream["stream_meta"] = {} - if hasattr(stream_class, "load_fields_metadata"): + if hasattr(stream_class, "load_fields_metadata") and stream in ["deals", "lineitems", "contacts"]: stream_class.load_fields_metadata() for field in stream["schema"]["properties"]: stream["schema"]["properties"][field]["field_meta"] = stream_class.fields_metadata.get(field, {}) From 372fb31bbed663c6e441a6885712585b9fcdd7fc Mon Sep 17 00:00:00 2001 From: Vinicius Mesel <4984147+vmesel@users.noreply.github.com> Date: Mon, 18 Dec 2023 12:23:01 -0300 Subject: [PATCH 08/16] Adds field meta --- tap_hubspot_beta/tap.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_hubspot_beta/tap.py b/tap_hubspot_beta/tap.py index 9fa1eb8..09466c4 100644 --- a/tap_hubspot_beta/tap.py +++ b/tap_hubspot_beta/tap.py @@ -132,7 +132,7 @@ def catalog_dict(self) -> dict: for stream in catalog["streams"]: stream_class = streams[stream["tap_stream_id"]] stream["stream_meta"] = {} - if hasattr(stream_class, "load_fields_metadata") and stream in ["deals", "lineitems", "contacts"]: + if hasattr(stream_class, "load_fields_metadata") and stream["stream"] in ["deals", "lineitems", "contacts"]: stream_class.load_fields_metadata() for field in stream["schema"]["properties"]: stream["schema"]["properties"][field]["field_meta"] = stream_class.fields_metadata.get(field, {}) From 5c1db43cd51384ec918c4905e8d622f42d12ff84 Mon Sep 17 00:00:00 2001 From: Vinicius Mesel <4984147+vmesel@users.noreply.github.com> Date: Mon, 18 Dec 2023 16:39:24 -0300 Subject: [PATCH 09/16] Adds companies to available fields_metadata --- tap_hubspot_beta/tap.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_hubspot_beta/tap.py b/tap_hubspot_beta/tap.py index 09466c4..ae7415c 100644 --- a/tap_hubspot_beta/tap.py +++ b/tap_hubspot_beta/tap.py @@ -132,7 +132,7 @@ def catalog_dict(self) -> dict: for stream in catalog["streams"]: stream_class = streams[stream["tap_stream_id"]] stream["stream_meta"] = {} - if hasattr(stream_class, "load_fields_metadata") and stream["stream"] in ["deals", "lineitems", "contacts"]: + if hasattr(stream_class, "load_fields_metadata") and stream["stream"] in ["deals", "lineitems", "contacts", "companies"]: stream_class.load_fields_metadata() for field in stream["schema"]["properties"]: stream["schema"]["properties"][field]["field_meta"] = stream_class.fields_metadata.get(field, {}) From 85b0aab949600e2b406363e1460d20305af5caa9 Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Mon, 8 Jan 2024 15:22:16 -0500 Subject: [PATCH 10/16] add archived streams back --- tap_hubspot_beta/tap.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tap_hubspot_beta/tap.py b/tap_hubspot_beta/tap.py index ae7415c..1fde780 100644 --- a/tap_hubspot_beta/tap.py +++ b/tap_hubspot_beta/tap.py @@ -44,6 +44,8 @@ AssociationQuotesDealsStream, ListMembershipV3Stream, ListSearchV3Stream, + ArchivedCompaniesStream, + ArchivedDealsStream, DealsAssociationParent ) @@ -86,6 +88,8 @@ AssociationQuotesDealsStream, ListMembershipV3Stream, ListSearchV3Stream, + ArchivedCompaniesStream, + ArchivedDealsStream, DealsAssociationParent ] From 0739e07fbe83e07e770c783a8131be007d50ffc0 Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Mon, 8 Jan 2024 15:25:48 -0500 Subject: [PATCH 11/16] fix consistency --- tap_hubspot_beta/client_v1.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tap_hubspot_beta/client_v1.py b/tap_hubspot_beta/client_v1.py index 36d6203..4f6aa1d 100644 --- a/tap_hubspot_beta/client_v1.py +++ b/tap_hubspot_beta/client_v1.py @@ -56,9 +56,10 @@ def get_url_params( def post_process(self, row: dict, context: Optional[dict]) -> dict: """As needed, append or transform raw data to match expected structure.""" if self.properties_url: - for name, value in row["properties"].items(): - row[name] = value.get("value") - del row["properties"] + if row.get("properties"): + for name, value in row.get("properties", {}).items(): + row[name] = value.get("value") + del row["properties"] for field in self.datetime_fields: if row.get(field) is not None: if row.get(field) in [0, ""]: From 6ae70095a83140ea5874aca0085bc299093bda0d Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Mon, 8 Jan 2024 15:26:41 -0500 Subject: [PATCH 12/16] Clean up --- tap_hubspot_beta/streams.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tap_hubspot_beta/streams.py b/tap_hubspot_beta/streams.py index b5a35b3..30bdc5c 100644 --- a/tap_hubspot_beta/streams.py +++ b/tap_hubspot_beta/streams.py @@ -1039,6 +1039,13 @@ class ArchivedLineItemsStream(hubspotV3Stream): th.Property("updatedAt", th.DateTimeType) ] + @property + def metadata(self): + new_metadata = super().metadata + new_metadata[("properties", "archivedAt")].selected = True + new_metadata[("properties", "archivedAt")].selected_by_default = True + return new_metadata + @property def selected(self) -> bool: """Check if stream is selected. From 808676d8ac508b0e108b72457e67cc9967f215d6 Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Wed, 10 Jan 2024 15:22:50 -0500 Subject: [PATCH 13/16] add log of total records for companies stream --- tap_hubspot_beta/client_v3.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tap_hubspot_beta/client_v3.py b/tap_hubspot_beta/client_v3.py index 71bd9f1..336f4f0 100644 --- a/tap_hubspot_beta/client_v3.py +++ b/tap_hubspot_beta/client_v3.py @@ -38,6 +38,10 @@ def get_next_page_token( ) -> Optional[Any]: """Return a token for identifying next page or None if no more pages.""" all_matches = extract_jsonpath(self.next_page_token_jsonpath, response.json()) + + if not previous_token: + self.logger.info(f"Total records to fetch for stream = {self.name}: {response.json().get('total')}") + next_page_token = next(iter(all_matches), None) if next_page_token == "10000": start_date = self.stream_state.get("progress_markers", {}).get( From b47e006c49596e620d15f3159bb12c95a8d81b36 Mon Sep 17 00:00:00 2001 From: Keyna Rafael <95432445+keyn4@users.noreply.github.com> Date: Mon, 22 Jan 2024 12:55:56 -0500 Subject: [PATCH 14/16] add currencies exchange rates stream (#25) --- tap_hubspot_beta/streams.py | 21 +++++++++++++++++++++ tap_hubspot_beta/tap.py | 6 ++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/tap_hubspot_beta/streams.py b/tap_hubspot_beta/streams.py index 30bdc5c..01866a5 100644 --- a/tap_hubspot_beta/streams.py +++ b/tap_hubspot_beta/streams.py @@ -1400,3 +1400,24 @@ class AssociationQuotesDealsStream(AssociationDealsStream): name = "associations_quotes_deals" path = "crm/v4/associations/deals/quotes/batch/read" + + +class CurrenciesStream(hubspotV3SearchStream): + """Owners Stream""" + + rest_method = "GET" + name = "currencies_exchange_rate" + path = "settings/v3/currencies/exchange-rates" + primary_keys = ["id"] + replication_key_filter = "updatedAt" + + schema = th.PropertiesList( + th.Property("createdAt", th.DateTimeType), + th.Property("toCurrencyCode", th.StringType), + th.Property("visibleInUI", th.BooleanType), + th.Property("effectiveAt", th.DateTimeType), + th.Property("id", th.StringType), + th.Property("conversionRate", th.NumberType), + th.Property("fromCurrencyCode", th.StringType), + th.Property("updatedAt", th.DateTimeType), + ).to_dict() \ No newline at end of file diff --git a/tap_hubspot_beta/tap.py b/tap_hubspot_beta/tap.py index 1fde780..a3baeac 100644 --- a/tap_hubspot_beta/tap.py +++ b/tap_hubspot_beta/tap.py @@ -46,7 +46,8 @@ ListSearchV3Stream, ArchivedCompaniesStream, ArchivedDealsStream, - DealsAssociationParent + DealsAssociationParent, + CurrenciesStream ) STREAM_TYPES = [ @@ -90,7 +91,8 @@ ListSearchV3Stream, ArchivedCompaniesStream, ArchivedDealsStream, - DealsAssociationParent + DealsAssociationParent, + CurrenciesStream ] From a7332b11dc72223d9bc255597efa93e2eeb0a924 Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Thu, 14 Mar 2024 15:33:17 -0400 Subject: [PATCH 15/16] HGI-5523: Fix missing deal associations --- tap_hubspot_beta/streams.py | 97 +++++++++++++++++++++++++++++++++++-- 1 file changed, 94 insertions(+), 3 deletions(-) diff --git a/tap_hubspot_beta/streams.py b/tap_hubspot_beta/streams.py index d683da2..d263338 100644 --- a/tap_hubspot_beta/streams.py +++ b/tap_hubspot_beta/streams.py @@ -20,10 +20,14 @@ from tap_hubspot_beta.client_v4 import hubspotV4Stream import time import pytz -from singer_sdk.helpers._state import log_sort_error from pendulum import parse from urllib.parse import urlencode +from singer_sdk.helpers._state import ( + finalize_state_progress_markers, + log_sort_error +) + association_schema = th.PropertiesList( th.Property("from_id", th.StringType), th.Property("to_id", th.StringType), @@ -893,14 +897,101 @@ class DealsStream(ObjectSearchV3): def get_child_context(self, record: dict, context) -> dict: return {"id": record["id"]} -class DealsAssociationParent(DealsStream): - name = "deals_association_parent" + +class DealsAssociationParent(hubspotV1Stream): + name = "deals_association_parent" + path = "deals/v1/deal/paged" replication_key = None primary_keys = ["id"] + + records_jsonpath = "$.deals[*]" + schema = th.PropertiesList( th.Property("id", th.StringType), ).to_dict() + def post_process(self, row, context): + row = super().post_process(row, context) + row["id"] = str(row["dealId"]) + return row + + def get_child_context(self, record: dict, context) -> dict: + return {"id": record["id"]} + + def _sync_records( # noqa C901 # too complex + self, context: Optional[dict] = None + ) -> None: + """Sync records, emitting RECORD and STATE messages. """ + record_count = 0 + current_context: Optional[dict] + context_list: Optional[List[dict]] + context_list = [context] if context is not None else self.partitions + selected = self.selected + + for current_context in context_list or [{}]: + partition_record_count = 0 + current_context = current_context or None + state = self.get_context_state(current_context) + state_partition_context = self._get_state_partition_context(current_context) + self._write_starting_replication_value(current_context) + child_context: Optional[dict] = ( + None if current_context is None else copy.copy(current_context) + ) + child_context_bulk = {"ids": []} + for record_result in self.get_records(current_context): + if isinstance(record_result, tuple): + # Tuple items should be the record and the child context + record, child_context = record_result + else: + record = record_result + child_context = copy.copy( + self.get_child_context(record=record, context=child_context) + ) + for key, val in (state_partition_context or {}).items(): + # Add state context to records if not already present + if key not in record: + record[key] = val + + # Sync children, except when primary mapper filters out the record + if self.stream_maps[0].get_filter_result(record): + child_context_bulk["ids"].append(child_context) + if len(child_context_bulk["ids"])>=5000: + self._sync_children(child_context_bulk) + child_context_bulk = {"ids": []} + self._check_max_record_limit(record_count) + if selected: + if (record_count - 1) % self.STATE_MSG_FREQUENCY == 0: + self._write_state_message() + self._write_record_message(record) + try: + self._increment_stream_state(record, context=current_context) + except InvalidStreamSortException as ex: + log_sort_error( + log_fn=self.logger.error, + ex=ex, + record_count=record_count + 1, + partition_record_count=partition_record_count + 1, + current_context=current_context, + state_partition_context=state_partition_context, + stream_name=self.name, + ) + raise ex + + record_count += 1 + partition_record_count += 1 + if len(child_context_bulk): + self._sync_children(child_context_bulk) + if current_context == state_partition_context: + # Finalize per-partition state only if 1:1 with context + finalize_state_progress_markers(state) + if not context: + # Finalize total stream only if we have the full full context. + # Otherwise will be finalized by tap at end of sync. + finalize_state_progress_markers(self.stream_state) + self._write_record_count_log(record_count=record_count, context=context) + # Reset interim bookmarks before emitting final STATE message: + self._write_state_message() + class ArchivedDealsStream(hubspotV3Stream): """Archived Deals Stream""" From 00f12f7d91cc46e0d15db9d57e2eee7ac6d1b912 Mon Sep 17 00:00:00 2001 From: Keyna Rafael <95432445+keyn4@users.noreply.github.com> Date: Thu, 6 Jun 2024 11:38:34 -0500 Subject: [PATCH 16/16] add curlify on error and back off for 400s (#65) * add curlify on error and back off for 400s * fix fatal api error * not backoff on 400 * put 400 as fatal error --- pyproject.toml | 1 + tap_hubspot_beta/client_base.py | 11 +++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 35af3fe..6d60dea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ python = "<3.11,>=3.7.1" requests = "^2.25.1" singer-sdk = "^0.4.4" "backports.cached-property" = "^1.0.1" +curlify = "^2.2.1" [tool.poetry.dev-dependencies] pytest = "^6.2.5" diff --git a/tap_hubspot_beta/client_base.py b/tap_hubspot_beta/client_base.py index a51c751..937847c 100644 --- a/tap_hubspot_beta/client_base.py +++ b/tap_hubspot_beta/client_base.py @@ -13,6 +13,7 @@ from urllib3.exceptions import ProtocolError from singer_sdk.mapper import SameRecordTransform, StreamMap from singer_sdk.helpers._flattening import get_flattening_options +import curlify from pendulum import parse @@ -146,14 +147,20 @@ def validate_response(self, response: requests.Response) -> None: f"{response.status_code} Server Error: " f"{response.reason} for path: {self.path}" ) - raise RetriableAPIError(msg) + curl_command = curlify.to_curl(response.request) + logging.error(f"Response code: {response.status_code}, info: {response.text}") + logging.error(f"CURL command for failed request: {curl_command}") + raise RetriableAPIError(f"Msg {msg}, response {response.text}") elif 400 <= response.status_code < 500: msg = ( f"{response.status_code} Client Error: " f"{response.reason} for path: {self.path}" ) - raise FatalAPIError(msg) + curl_command = curlify.to_curl(response.request) + logging.error(f"Response code: {response.status_code}, info: {response.text}") + logging.error(f"CURL command for failed request: {curl_command}") + raise FatalAPIError(f"Msg {msg}, response {response.text}") @staticmethod def extract_type(field):