diff --git a/tap_hubspot_beta/client_v3.py b/tap_hubspot_beta/client_v3.py index 4c5c96a..60f08aa 100644 --- a/tap_hubspot_beta/client_v3.py +++ b/tap_hubspot_beta/client_v3.py @@ -104,6 +104,7 @@ def prepare_request_payload( def post_process(self, row: dict, context: Optional[dict]) -> dict: """As needed, append or transform raw data to match expected structure.""" row = self.parse_properties(row) + row["isAssociated"] = False return row def _sync_records( # noqa C901 # too complex diff --git a/tap_hubspot_beta/streams.py b/tap_hubspot_beta/streams.py index 0f41197..4970ac2 100644 --- a/tap_hubspot_beta/streams.py +++ b/tap_hubspot_beta/streams.py @@ -1,6 +1,6 @@ """Stream type classes for tap-hubspot.""" from datetime import datetime -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional, Generator import copy from singer_sdk.exceptions import InvalidStreamSortException @@ -23,6 +23,13 @@ from singer_sdk.helpers._state import log_sort_error from pendulum import parse from urllib.parse import urlencode +from singer_sdk.helpers._singer import ( + MetadataMapping, +) +from singer import SchemaMessage, RecordMessage +from singer_sdk.helpers._util import utc_now +from singer_sdk.helpers._typing import conform_record_data_types + class AccountStream(hubspotV1Stream): """Account Stream""" @@ -749,8 +756,29 @@ class ObjectSearchV3(hubspotV3SearchStream): th.Property("updatedAt", th.DateTimeType), th.Property("archived", th.BooleanType), th.Property("archivedAt", th.DateTimeType), + th.Property("isAssociated", th.BooleanType), ] + def add_fetched_ids(self, record_id): + # add fetched ids to a global var to avoid dups when fetching associated object records + fetched_ids = self._tap.fetched_objects_ids.get(self.name) + if fetched_ids: + fetched_ids.append(record_id) + else: + self._tap.fetched_objects_ids[self.name] = [record_id] + + def get_child_context(self, record: dict, context) -> dict: + record_id = record["id"] + # add fetched ids to a global var to avoid dups when fetching associated object records + self.add_fetched_ids(record_id) + return {"id": record_id} + + def post_process(self, row, context) -> dict: + row = super().post_process(row, context) + fetched_ids = self._tap.fetched_objects_ids.get(self.name, []) + if row["id"] not in fetched_ids: + return row + class ContactsV3Stream(ObjectSearchV3): """Contacts Stream""" @@ -779,9 +807,6 @@ def apply_catalog(self, catalog) -> None: if catalog_entry.replication_method: self.forced_replication_method = catalog_entry.replication_method - def get_child_context(self, record: dict, context) -> dict: - return {"id": record["id"]} - class CompaniesStream(ObjectSearchV3): """Companies Stream""" @@ -1441,4 +1466,381 @@ class CurrenciesStream(hubspotV3SearchStream): th.Property("conversionRate", th.NumberType), th.Property("fromCurrencyCode", th.StringType), th.Property("updatedAt", th.DateTimeType), - ).to_dict() \ No newline at end of file + ).to_dict() + + +#--- Base stream for dynamic associations based in config flag + +class DynamicAssociationsStream(hubspotV4Stream): + """Base association stream for 2 objects in hubspot - Static Object -> Dynamic Object""" + + associated_object = None + assoc_index = None + object_name = None + + schema = th.PropertiesList( + th.Property("from_id", th.StringType), + th.Property("to_id", th.StringType), + th.Property("typeId", th.NumberType), + th.Property("category", th.StringType), + th.Property("label", th.StringType), + th.Property("associationTypes", th.CustomType({"type": ["array", "object"]})), + ).to_dict() + + @property + def path(self): + self.assoc_index = self.assoc_index or 0 + if len(self.association_objects) > self.assoc_index: + self.associated_object = self.association_objects[self.assoc_index] + self._write_schema_message() + if self.associated_object: + return f"crm/v4/associations/{self.object_name}/{self.associated_object}/batch/read" + + def parent_name(self): + parent_name = self.parent_stream_type.name + selected_asc = self.config.get("fetch_associations", {}) + return parent_name if selected_asc.get(parent_name) else parent_name.split("_v3")[0] + + @property + def association_objects(self): + parent_name = self.parent_name() + selected_asc = self.config.get("fetch_associations", {}) + + if selected_asc.get(parent_name) and self.parent_stream_type.selected: + return selected_asc[parent_name] + else: + return [] + + @property + def selected(self) -> bool: + # selects the association stream by default if fetch_associations is in config and base stream is selected + if not self.association_objects: + return False + else: + return self._tap.catalog[self.parent_name()].metadata.get(()).selected + + @property + def metadata(self): + # select by default all properties + new_metadata = super().metadata + for field in new_metadata: + new_metadata[field].selected = True + return new_metadata + + def get_next_page_token(self, response, previous_token): + # iterate through all the associations set for the same object in fetch_associations + previous_token = previous_token or 0 + next_page_token = previous_token + 1 + if next_page_token < len(self.association_objects): + self.assoc_index = next_page_token + return next_page_token + else: + self.assoc_index = None + + def get_child_context(self, record, context) -> dict: + associated_id = record["to_id"] + # check if associated record has been fetched to avoid dups + fetched_ids = self._tap.fetched_objects_ids.get(self.associated_object, []) + if associated_id not in fetched_ids: + if fetched_ids: + fetched_ids.append(associated_id) + else: + self._tap.fetched_objects_ids[self.associated_object] = [associated_id] + return {"associated_id": record["to_id"], "object": self.associated_object} + + def _sync_children(self, child_context: dict) -> None: + if child_context: + return super()._sync_children(child_context) + + def _sync_records(self, context) -> None: + if self.association_objects: + return super()._sync_records(context) + + # write the schema and record names after the association object + @property + def stream_alias(self): + # name for schema and output file + return f"associations_{self.object_name}_{self.associated_object}" + + def _write_record_message(self, record: dict) -> None: + for record_message in self._generate_record_messages(record): + # force this to think it's the companies stream + record_message.stream = self.stream_alias + singer.write_message(record_message) + + def _write_schema_message(self) -> None: + for schema_message in self._generate_schema_messages(): + if self.associated_object: + schema_message.stream = self.stream_alias + singer.write_message(schema_message) + + +# child classes of dynamic association +class ContactsAssociationsStream(DynamicAssociationsStream): + name = "contacts_associations" + parent_stream_type = ContactsV3Stream + object_name = "contacts" + + +class MeetingsAssociationsStream(DynamicAssociationsStream): + name = "meetings_associations" + parent_stream_type = MeetingsStream + object_name = "meetings" + + +class CallsAssociationsStream(DynamicAssociationsStream): + name = "calls_associations" + parent_stream_type = CallsStream + object_name = "calls" + + +class CommunicationsAssociationsStream(DynamicAssociationsStream): + name = "communications_associations" + parent_stream_type = CommunicationsStream + object_name = "communications" + + +class EmailsAssociationsStream(DynamicAssociationsStream): + name = "emails_associations" + parent_stream_type = EmailsStream + object_name = "emails" + + +class NotesAssociationsStream(DynamicAssociationsStream): + name = "notes_associations" + parent_stream_type = NotesStream + object_name = "notes" + + +class PostalMailAssociationsStream(DynamicAssociationsStream): + name = "postal_mail_associations" + parent_stream_type = PostalMailStream + object_name = "postal_mail" + + +class TasksAssociationsStream(DynamicAssociationsStream): + name = "tasks_associations" + parent_stream_type = TasksStream + object_name = "tasks" + + +class CompaniesAssociationsStream(DynamicAssociationsStream): + name = "companies_associations" + parent_stream_type = CompaniesStream + object_name = "companies" + + +class TicketsAssociationsStream(DynamicAssociationsStream): + name = "tickets_associations" + parent_stream_type = TicketsStream + object_name = "tickets" + + +class ProductsAssociationsStream(DynamicAssociationsStream): + name = "products_associations" + parent_stream_type = ProductsStream + object_name = "products" + + +class QuotesAssociationsStream(DynamicAssociationsStream): + name = "quotes_associations" + parent_stream_type = QuotesStream + object_name = "quotes" + + +class DealsAssociationsStream(DynamicAssociationsStream): + name = "deals_associations" + parent_stream_type = DealsStream + object_name = "deals" + + +class AssociatedObjects(ObjectSearchV3): + associated_object = None + replication_key = None + visible_in_catalog = False + + @property + def path(self): + return f"crm/v3/objects/{self.associated_object}/search" + + @property + def schema(self): + if self.associated_object: + object_schema = self._tap.catalog[self.associated_object].schema + return object_schema.to_dict() + # placeholder for discover + return th.PropertiesList( + th.Property("id", th.StringType), + ).to_dict() + + @property + def selected(self) -> bool: + # selects by default if parent class is selected + parent_name = self.parent_stream_type.parent_stream_type.name + selected_asc = self.config.get("fetch_associations", {}) + if selected_asc.get(parent_name) and self.parent_stream_type.selected: + return True + + # write the schema and record names after the association object + @property + def stream_alias(self): + # name for schema and output file + return self.associated_object + + def _generate_record_messages( + self, + record: dict, + ) -> Generator[RecordMessage, None, None]: + record = conform_record_data_types( + stream_name=self.name, + row=record, + schema=self.schema, + logger=self.logger, + ) + for stream_map in self.stream_maps: + mapped_record = stream_map.transform(record) + # Emit record if not filtered + if mapped_record is not None: + record_message = RecordMessage( + stream=stream_map.stream_alias, + record=mapped_record, + version=None, + time_extracted=utc_now(), + ) + yield record_message + + def _write_record_message(self, record: dict) -> None: + for record_message in self._generate_record_messages(record): + # force this to think it's the companies stream + record_message.stream = self.stream_alias + singer.write_message(record_message) + + def _write_schema_message(self) -> None: + schema_message = SchemaMessage( + self.stream_alias, + self.schema, + ['id'], + None, + ) + schema_message.stream = self.stream_alias + singer.write_message(schema_message) + + def _sync_records(self, context) -> None: + # get object that will be fetched + self.associated_object = context.pop("object", None) + catalog_entry = self._tap_input_catalog.get_stream(self.associated_object) + if catalog_entry: + # update schema and metadata for object + self.schema + self.metadata + self._write_schema_message() + return super()._sync_records(context) + + @cached_property + def selected_properties(self): + metadata = {} + # get fields from associated object schema + if self._tap_input_catalog and self.associated_object: + catalog_entry = self._tap_input_catalog.get_stream(self.associated_object) + if catalog_entry: + metadata = catalog_entry.metadata + + # get list of all properties for the payload + selected_properties = [] + for key, value in metadata.items(): + if isinstance(key, tuple) and len(key) == 2: + selected_properties.append(key[-1]) + return selected_properties + + def prepare_request_payload(self, context, next_page_token): + self.add_fetched_ids(context["associated_id"]) + return { + "properties": self.selected_properties, + "filterGroups": [ + { + "filters": [ + { + "propertyName": "hs_object_id", + "value": context["associated_id"], + "operator": "EQ" + } + ] + } + ] + } + + def get_next_page_token(self, response, previous_token): + return None + + def post_process(self, row, context) -> dict: + for name, value in row["properties"].items(): + row[name] = self.parse_value(name, value) + del row["properties"] + row["isAssociated"] = True + return row + + +class ContactsAssociatedRecords(AssociatedObjects): + name = "contacts_associated_records" + parent_stream_type = ContactsAssociationsStream + + +class MeetingsAssociatedRecords(AssociatedObjects): + name = "meetings_associated_records" + parent_stream_type = MeetingsAssociationsStream + + +class CallsAssociatedRecords(AssociatedObjects): + name = "calls_associated_records" + parent_stream_type = CallsAssociationsStream + + +class CommunicationsAssociatedRecords(AssociatedObjects): + name = "communications_associated_records" + parent_stream_type = CommunicationsAssociationsStream + + +class EmailsAssociatedRecords(AssociatedObjects): + name = "emails_associated_records" + parent_stream_type = EmailsAssociationsStream + + +class NotesAssociatedRecords(AssociatedObjects): + name = "notes_associated_records" + parent_stream_type = NotesAssociationsStream + + +class PostalMailAssociatedRecords(AssociatedObjects): + name = "postal_mail_associated_records" + parent_stream_type = PostalMailAssociationsStream + object_name = "postal_mail" + + +class TasksAssociatedRecords(AssociatedObjects): + name = "tasks_associated_records" + parent_stream_type = TasksAssociationsStream + + +class CompaniesAssociatedRecords(AssociatedObjects): + name = "companies_associated_records" + parent_stream_type = CompaniesAssociationsStream + + +class TicketsAssociatedRecords(AssociatedObjects): + name = "tickets_associated_records" + parent_stream_type = TicketsAssociationsStream + + +class ProductsAssociatedRecords(AssociatedObjects): + name = "products_associated_records" + parent_stream_type = ProductsAssociationsStream + + +class QuotesAssociatedRecords(AssociatedObjects): + name = "quotes_associated_records" + parent_stream_type = QuotesAssociationsStream + + +class DealsAssociatedRecords(AssociatedObjects): + name = "deals_associated_records" + parent_stream_type = DealsAssociationsStream diff --git a/tap_hubspot_beta/tap.py b/tap_hubspot_beta/tap.py index 48eb124..3a48e16 100644 --- a/tap_hubspot_beta/tap.py +++ b/tap_hubspot_beta/tap.py @@ -50,7 +50,33 @@ ArchivedCompaniesStream, ArchivedDealsStream, DealsAssociationParent, - CurrenciesStream + CurrenciesStream, + ContactsAssociationsStream, + MeetingsAssociationsStream, + CallsAssociationsStream, + CommunicationsAssociationsStream, + EmailsAssociationsStream, + NotesAssociationsStream, + PostalMailAssociationsStream, + TasksAssociationsStream, + QuotesAssociationsStream, + ProductsAssociationsStream, + TicketsAssociationsStream, + CompaniesAssociationsStream, + DealsAssociationsStream, + ContactsAssociatedRecords, + QuotesAssociatedRecords, + ProductsAssociatedRecords, + TicketsAssociatedRecords, + CompaniesAssociatedRecords, + TasksAssociatedRecords, + PostalMailAssociatedRecords, + NotesAssociatedRecords, + EmailsAssociatedRecords, + CommunicationsAssociatedRecords, + CallsAssociatedRecords, + MeetingsAssociatedRecords, + DealsAssociatedRecords ) STREAM_TYPES = [ @@ -95,7 +121,33 @@ # ArchivedCompaniesStream, # ArchivedDealsStream, # DealsAssociationParent, - CurrenciesStream + CurrenciesStream, + ContactsAssociationsStream, + MeetingsAssociationsStream, + CallsAssociationsStream, + CommunicationsAssociationsStream, + EmailsAssociationsStream, + NotesAssociationsStream, + PostalMailAssociationsStream, + TasksAssociationsStream, + QuotesAssociationsStream, + DealsAssociationsStream, + ProductsAssociationsStream, + TicketsAssociationsStream, + CompaniesAssociationsStream, + ContactsAssociatedRecords, + QuotesAssociatedRecords, + ProductsAssociatedRecords, + TicketsAssociatedRecords, + CompaniesAssociatedRecords, + TasksAssociatedRecords, + PostalMailAssociatedRecords, + NotesAssociatedRecords, + EmailsAssociatedRecords, + CommunicationsAssociatedRecords, + CallsAssociatedRecords, + MeetingsAssociatedRecords, + DealsAssociatedRecords ] @@ -114,6 +166,8 @@ def __init__( ) -> None: self.config_file = config[0] super().__init__(config, catalog, state, parse_env_config, validate_config) + + fetched_objects_ids = {} config_jsonschema = th.PropertiesList( th.Property("client_id", th.StringType, required=True), @@ -247,6 +301,35 @@ def load_streams(self) -> List[Stream]: reverse=False, ) + @final + def sync_all(self) -> None: + """Sync all streams.""" + self._reset_state_progress_markers() + self._set_compatible_replication_methods() + stream: "Stream" + # force dynamic associations streams to be synced at the end: + associations_stream = {stream_name: stream for stream_name, stream in self.streams.items() if ("_associations" in stream_name or "_associated_records" in stream_name)} + # pop associations streams from self.streams to order them later + [self.streams.pop(stream_name) for stream_name in associations_stream] + # order self.streams to sync associations at the end + self.streams.update(associations_stream) + + for stream in self.streams.values(): + if not stream.selected and not stream.has_selected_descendents: + self.logger.info(f"Skipping deselected stream '{stream.name}'.") + continue + + if stream.parent_stream_type: + self.logger.debug( + f"Child stream '{type(stream).__name__}' is expected to be called " + f"by parent stream '{stream.parent_stream_type.__name__}'. " + "Skipping direct invocation." + ) + continue + + stream.sync() + stream.finalize_state_progress_markers() + if __name__ == "__main__": Taphubspot.cli()