Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/hgi 4472 #22

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4b44d58
Adds support for field_meta/stream_meta on the catalog.json
vmesel Nov 6, 2023
3ee6961
Merge pull request #9 from hotgluexyz/master
hsyyid Nov 13, 2023
f6ee0f7
Merge branch 'master' into feature/hgi-4472
hsyyid Nov 15, 2023
3f6fb4c
Merge branch 'master' into feature/hgi-4472
hsyyid Nov 15, 2023
a760de6
Handle case where lineitems_archived is not in catalog
hsyyid Nov 16, 2023
a63dddf
Merge branch 'master' into feature/hgi-4472
hsyyid Nov 17, 2023
7b9a09b
Merge branch 'master' into feature/hgi-4472
hsyyid Dec 15, 2023
d4d97c1
Revert "Merge branch 'master' into feature/hgi-4472"
davi-souza Dec 15, 2023
5991198
Deals association separate stream with full sync (#15)
xacadil Dec 5, 2023
78f9f23
Fix to automatically add deals_association_parent stream if it is mis…
xacadil Dec 6, 2023
70119f3
infinite loop fix for non-inremental Deals association parent stream …
xacadil Dec 8, 2023
5968aac
Merge pull request #18 from hotgluexyz/Add-DealsAssociation-full-sync…
hsyyid Dec 16, 2023
1b9d13c
Restricts which streams generate metadata
vmesel Dec 18, 2023
372fb31
Adds field meta
vmesel Dec 18, 2023
5c1db43
Adds companies to available fields_metadata
vmesel Dec 18, 2023
cf40af1
Merge branch 'master' into feature/hgi-4472
hsyyid Jan 8, 2024
623ba80
Merge branch 'master' into feature/hgi-4472
hsyyid Jan 8, 2024
85b0aab
add archived streams back
hsyyid Jan 8, 2024
0739e07
fix consistency
hsyyid Jan 8, 2024
6ae7009
Clean up
hsyyid Jan 8, 2024
808676d
add log of total records for companies stream
hsyyid Jan 10, 2024
b47e006
add currencies exchange rates stream (#25)
keyn4 Jan 22, 2024
8e43f1f
Merge branch 'master' into feature/hgi-4472
hsyyid Feb 12, 2024
81eb484
Merge branch 'master' into feature/hgi-4472
hsyyid Feb 15, 2024
eaa5c32
Merge branch 'master' into feature/hgi-4472
hsyyid Feb 27, 2024
dfd7eef
Merge branch 'master' into feature/hgi-4472
hsyyid Feb 28, 2024
a7332b1
HGI-5523: Fix missing deal associations
hsyyid Mar 14, 2024
00f12f7
add curlify on error and back off for 400s (#65)
keyn4 Jun 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ python = "<3.11,>=3.7.1"
requests = "^2.25.1"
singer-sdk = "^0.4.4"
"backports.cached-property" = "^1.0.1"
curlify = "^2.2.1"

[tool.poetry.dev-dependencies]
pytest = "^6.2.5"
Expand Down
11 changes: 9 additions & 2 deletions tap_hubspot_beta/client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from urllib3.exceptions import ProtocolError
from singer_sdk.mapper import SameRecordTransform, StreamMap
from singer_sdk.helpers._flattening import get_flattening_options
import curlify

from pendulum import parse

Expand Down Expand Up @@ -146,14 +147,20 @@ def validate_response(self, response: requests.Response) -> None:
f"{response.status_code} Server Error: "
f"{response.reason} for path: {self.path}"
)
raise RetriableAPIError(msg)
curl_command = curlify.to_curl(response.request)
logging.error(f"Response code: {response.status_code}, info: {response.text}")
logging.error(f"CURL command for failed request: {curl_command}")
raise RetriableAPIError(f"Msg {msg}, response {response.text}")

elif 400 <= response.status_code < 500:
msg = (
f"{response.status_code} Client Error: "
f"{response.reason} for path: {self.path}"
)
raise FatalAPIError(msg)
curl_command = curlify.to_curl(response.request)
logging.error(f"Response code: {response.status_code}, info: {response.text}")
logging.error(f"CURL command for failed request: {curl_command}")
raise FatalAPIError(f"Msg {msg}, response {response.text}")

@staticmethod
def extract_type(field):
Expand Down
4 changes: 4 additions & 0 deletions tap_hubspot_beta/client_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ def get_next_page_token(
) -> Optional[Any]:
"""Return a token for identifying next page or None if no more pages."""
all_matches = extract_jsonpath(self.next_page_token_jsonpath, response.json())

if not previous_token:
self.logger.info(f"Total records to fetch for stream = {self.name}: {response.json().get('total')}")

next_page_token = next(iter(all_matches), None)
if next_page_token == "10000":
start_date = self.stream_state.get("progress_markers", {}).get(
Expand Down
108 changes: 96 additions & 12 deletions tap_hubspot_beta/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
from tap_hubspot_beta.client_v4 import hubspotV4Stream
import time
import pytz
from singer_sdk.helpers._state import log_sort_error
from pendulum import parse
from urllib.parse import urlencode

from singer_sdk.helpers._state import (
finalize_state_progress_markers,
log_sort_error
)

association_schema = th.PropertiesList(
th.Property("from_id", th.StringType),
th.Property("to_id", th.StringType),
Expand Down Expand Up @@ -882,7 +886,6 @@ class TicketsStream(ObjectSearchV3):
replication_key_filter = "hs_lastmodifieddate"
properties_url = "properties/v2/tickets/properties"


class DealsStream(ObjectSearchV3):
"""Deals Stream"""

Expand All @@ -894,14 +897,101 @@ class DealsStream(ObjectSearchV3):
def get_child_context(self, record: dict, context) -> dict:
return {"id": record["id"]}

class DealsAssociationParent(DealsStream):
name = "deals_association_parent"

class DealsAssociationParent(hubspotV1Stream):
name = "deals_association_parent"
path = "deals/v1/deal/paged"
replication_key = None
primary_keys = ["id"]

records_jsonpath = "$.deals[*]"

schema = th.PropertiesList(
th.Property("id", th.StringType),
).to_dict()

def post_process(self, row, context):
row = super().post_process(row, context)
row["id"] = str(row["dealId"])
return row

def get_child_context(self, record: dict, context) -> dict:
return {"id": record["id"]}

def _sync_records( # noqa C901 # too complex
self, context: Optional[dict] = None
) -> None:
"""Sync records, emitting RECORD and STATE messages. """
record_count = 0
current_context: Optional[dict]
context_list: Optional[List[dict]]
context_list = [context] if context is not None else self.partitions
selected = self.selected

for current_context in context_list or [{}]:
partition_record_count = 0
current_context = current_context or None
state = self.get_context_state(current_context)
state_partition_context = self._get_state_partition_context(current_context)
self._write_starting_replication_value(current_context)
child_context: Optional[dict] = (
None if current_context is None else copy.copy(current_context)
)
child_context_bulk = {"ids": []}
for record_result in self.get_records(current_context):
if isinstance(record_result, tuple):
# Tuple items should be the record and the child context
record, child_context = record_result
else:
record = record_result
child_context = copy.copy(
self.get_child_context(record=record, context=child_context)
)
for key, val in (state_partition_context or {}).items():
# Add state context to records if not already present
if key not in record:
record[key] = val

# Sync children, except when primary mapper filters out the record
if self.stream_maps[0].get_filter_result(record):
child_context_bulk["ids"].append(child_context)
if len(child_context_bulk["ids"])>=5000:
self._sync_children(child_context_bulk)
child_context_bulk = {"ids": []}
self._check_max_record_limit(record_count)
if selected:
if (record_count - 1) % self.STATE_MSG_FREQUENCY == 0:
self._write_state_message()
self._write_record_message(record)
try:
self._increment_stream_state(record, context=current_context)
except InvalidStreamSortException as ex:
log_sort_error(
log_fn=self.logger.error,
ex=ex,
record_count=record_count + 1,
partition_record_count=partition_record_count + 1,
current_context=current_context,
state_partition_context=state_partition_context,
stream_name=self.name,
)
raise ex

record_count += 1
partition_record_count += 1
if len(child_context_bulk):
self._sync_children(child_context_bulk)
if current_context == state_partition_context:
# Finalize per-partition state only if 1:1 with context
finalize_state_progress_markers(state)
if not context:
# Finalize total stream only if we have the full full context.
# Otherwise will be finalized by tap at end of sync.
finalize_state_progress_markers(self.stream_state)
self._write_record_count_log(record_count=record_count, context=context)
# Reset interim bookmarks before emitting final STATE message:
self._write_state_message()


class ArchivedDealsStream(hubspotV3Stream):
"""Archived Deals Stream"""
Expand Down Expand Up @@ -1212,7 +1302,6 @@ class AssociationDealsStream(hubspotV4Stream):
th.Property("associationTypes", th.CustomType({"type": ["array", "object"]})),
).to_dict()


class AssociationContactsStream(hubspotV4Stream):
"""Association Base Stream"""

Expand Down Expand Up @@ -1249,14 +1338,12 @@ class AssociationDealsLineItemsStream(AssociationDealsStream):
name = "associations_deals_line_items"
path = "crm/v4/associations/deals/line_items/batch/read"


class AssociationContactsTicketsStream(AssociationContactsStream):
"""Association Contacts -> Tickets Stream"""

name = "associations_contacts_tickets"
path = "crm/v4/associations/contacts/tickets/batch/read"


class AssociationContactsStream(hubspotV4Stream):
"""Association Base Stream"""

Expand All @@ -1279,7 +1366,6 @@ class AssociationContactsCompaniesStream(AssociationContactsStream):
name = "associations_contacts_companies"
path = "crm/v4/associations/contacts/companies/batch/read"


class MarketingEmailsStream(hubspotV1Stream):
"""Dispositions Stream"""

Expand Down Expand Up @@ -1383,7 +1469,6 @@ class MarketingEmailsStream(hubspotV1Stream):
th.Property("vidsIncluded", th.CustomType({"type": ["array", "string"]})),
).to_dict()


class PostalMailStream(ObjectSearchV3):
"""Owners Stream"""

Expand Down Expand Up @@ -1448,7 +1533,6 @@ class QuotesStream(ObjectSearchV3):
replication_key_filter = "hs_lastmodifieddate"
properties_url = "properties/v2/quotes/properties"


class AssociationQuotesDealsStream(AssociationDealsStream):
"""Association Quotes -> Deals Stream"""

Expand All @@ -1457,7 +1541,7 @@ class AssociationQuotesDealsStream(AssociationDealsStream):


class CurrenciesStream(hubspotV3Stream):
"""Owners Stream"""
"""Currencies Stream"""

name = "currencies_exchange_rate"
path = "settings/v3/currencies/exchange-rates"
Expand Down Expand Up @@ -1689,4 +1773,4 @@ class AssociationTasksDealsStream(AssociationTasksStream):
"""Association Tasks -> Deals Stream"""

name = "associations_tasks_deals"
path = "crm/v4/associations/tasks/deals/batch/read"
path = "crm/v4/associations/tasks/deals/batch/read"
17 changes: 8 additions & 9 deletions tap_hubspot_beta/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,14 @@ def catalog_dict(self) -> dict:
The tap's catalog as a dict
"""
catalog = super().catalog_dict
if self.config.get("catalog_metadata", False):
streams = self.streams
for stream in catalog["streams"]:
stream_class = streams[stream["tap_stream_id"]]
stream["stream_meta"] = {}
if hasattr(stream_class, "load_fields_metadata"):
stream_class.load_fields_metadata()
for field in stream["schema"]["properties"]:
stream["schema"]["properties"][field]["field_meta"] = stream_class.fields_metadata.get(field, {})
streams = self.streams
for stream in catalog["streams"]:
stream_class = streams[stream["tap_stream_id"]]
stream["stream_meta"] = {}
if hasattr(stream_class, "load_fields_metadata") and stream["stream"] in ["deals", "lineitems", "contacts", "companies"]:
stream_class.load_fields_metadata()
for field in stream["schema"]["properties"]:
stream["schema"]["properties"][field]["field_meta"] = stream_class.fields_metadata.get(field, {})
return catalog


Expand Down