Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean Diff #71

Open
wants to merge 35 commits into
base: feature/hgi-4472
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
8ee0294
add fullsync companies stream
keyn4 Jan 10, 2024
5824679
fixes on fullsync companies
keyn4 Jan 10, 2024
acdfa4e
add parsing for dates in client_v2
keyn4 Jan 12, 2024
677e394
minor fix
hsyyid Jan 12, 2024
7174ce1
fix basic fields for fullsync_companies
keyn4 Jan 13, 2024
5e46d25
fix
hsyyid Jan 13, 2024
ca8a7c0
use fullsync_companies and companies dynamically
keyn4 Jan 15, 2024
5f1d2c1
fix companies incremental sync logic
keyn4 Jan 16, 2024
2b29242
Merge branch 'master' into add-fullsync_companies-stream
hsyyid Jan 23, 2024
06b2ddb
Merge branch 'master' into add-fullsync_companies-stream
hsyyid Feb 27, 2024
3db0759
Merge branch 'master' into add-fullsync_companies-stream
hsyyid Feb 28, 2024
c145655
add archive val in archived stream as _hg_archived (#35)
keyn4 Mar 4, 2024
a2b63b1
add streams for properties with history (#37)
keyn4 Mar 12, 2024
376053b
change approach for no bulk child streams (#43)
keyn4 Mar 18, 2024
76cfd7f
add pk to history streams and get properties (#44)
keyn4 Mar 18, 2024
07386c2
format date time fields for hubspotV2Stream as standard format for hu…
keyn4 Apr 30, 2024
c171b68
Merge branch 'feature/hgi-4472' into add-fullsync_companies-stream
hsyyid Jul 16, 2024
6773a6e
Disable fullsync companies
hsyyid Jul 17, 2024
1b16a22
comment fullsync companies logic
keyn4 Jul 17, 2024
14a852a
deduplicate cols in dynamic schema(case insensitive) (#77)
keyn4 Aug 29, 2024
3218765
HG-3499: parse empty strings as booleans
hsyyid Oct 9, 2024
3ddf12c
only apply str handling if str
hsyyid Oct 9, 2024
2a19440
Revert "Disable fullsync companies"
hsyyid Oct 13, 2024
d5e648d
add safety checks on fullsync stream
hsyyid Oct 13, 2024
0ed600b
lower cap
hsyyid Oct 13, 2024
925f2f0
Add _hg_archived_at to fsc
hsyyid Oct 13, 2024
f85afa4
add owners archived stream
keyn4 Nov 6, 2024
5feb632
fix owners archived selected logic
keyn4 Nov 6, 2024
bc99254
Fix comment
hsyyid Nov 6, 2024
d647677
Add userIdIncludingInactive to Owners schema (#98)
brenhogan Nov 15, 2024
4837cef
Remove rep key for archived owners
brenhogan Nov 15, 2024
f010c4b
Revert archivedowners rep key
brenhogan Nov 15, 2024
c78153e
enable archived streams to not have a replication_key (#99)
arilton Nov 18, 2024
101ec5e
fetch deleted stages (#107)
keyn4 Jan 3, 2025
f7ba964
bump deleted stages logic (#109)
keyn4 Jan 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 63 additions & 2 deletions tap_hubspot_beta/client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,24 @@ def request_records(self, context):

while not finished:
logging.getLogger("backoff").setLevel(logging.CRITICAL)

# only use companies stream for incremental syncs
if self.name == "companies":
fullsync_companies_state = self.tap_state.get("bookmarks", {}).get("fullsync_companies", {})
fullsync_on = False
try:
# Check if the fullsync stream is selected or not
fullsync_on = [s for s in self._tap.streams.items() if str(s[0]) == "fullsync_companies"][0][1].selected
except:
pass
if fullsync_on and not fullsync_companies_state.get("replication_key") and self.is_first_sync():
finished = True
yield from []
break
elif fullsync_companies_state.get("replication_key") and self.is_first_sync():
self.stream_state.update(fullsync_companies_state)
self.stream_state["starting_replication_value"] = self.stream_state["replication_key_value"]

prepared_request = self.prepare_request(
context, next_page_token=next_page_token
)
Expand Down Expand Up @@ -189,11 +207,23 @@ def schema(self):
headers.update(self.authenticator.auth_headers or {})
url = self.url_base + self.properties_url
response = self.request_decorator(self.request_schema)(url, headers=headers)

fields = response.json()

deduplicate_columns = self.config.get("deduplicate_columns", True)
base_properties = []
if isinstance(self.base_properties, list):
base_properties = [property.name.lower() for property in self.base_properties]

for field in fields:
field_name = field.get("name")
# filter duplicated columns (case insensitive)
if deduplicate_columns:
if field_name.lower() in base_properties:
self.logger.info(f"Not including field {field_name} in catalog as it's a duplicate(case insensitive) of a base property for stream {self.name}")
continue

if not field.get("deleted"):
property = th.Property(field.get("name"), self.extract_type(field))
property = th.Property(field_name, self.extract_type(field))
properties.append(property)
return th.PropertiesList(*properties).to_dict()

Expand Down Expand Up @@ -308,6 +338,37 @@ def stream_maps(self) -> List[StreamMap]:
]
return self._stream_maps

def process_row_types(self,row) -> Dict[str, Any]:
schema = self.schema['properties']
# If the row is null we ignore
if row is None:
return row

for field, value in row.items():
if field not in schema:
# Skip fields not found in the schema
continue

field_info = schema[field]
field_type = field_info.get("type", ["null"])[0]

if field_type == "boolean":
if value is None:
row[field] = False
elif isinstance(value, str):
# Attempt to cast to boolean
if value.lower() == "true":
row[field] = True
elif value == "" or value.lower() == "false":
row[field] = False

return row

def is_first_sync(self):
if self.stream_state.get("replication_key"):
return False
return True


class hubspotStreamSchema(hubspotStream):

Expand Down
1 change: 1 addition & 0 deletions tap_hubspot_beta/client_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,5 @@ def post_process(self, row: dict, context: Optional[dict]) -> dict:
dt_field = datetime.fromtimestamp(int(row[field]) / 1000)
dt_field = dt_field.replace(tzinfo=None)
row[field] = dt_field.isoformat()
row = self.process_row_types(row)
return row
76 changes: 76 additions & 0 deletions tap_hubspot_beta/client_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@

import logging
from datetime import datetime
from typing import Any, Dict, Optional, Iterable

import requests
from singer_sdk.helpers.jsonpath import extract_jsonpath

from tap_hubspot_beta.client_base import hubspotStreamSchema
import copy


class hubspotV2Stream(hubspotStreamSchema):
"""hubspot stream class."""

def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization."""
params: dict = {}
params["limit"] = self.page_size
params.update(self.additional_prarams)
if self.properties_url:
params["properties"] = self.selected_properties
if next_page_token:
params["offset"] = next_page_token["offset"]
return params

def post_process(self, row: dict, context: Optional[dict]) -> dict:
"""As needed, append or transform raw data to match expected structure."""
if self.properties_url:
for name, value in row.get("properties").items():
row[name] = value.get("value")
row["id"] = str(row["companyId"])
del row["properties"]
for field in self.datetime_fields:
if row.get(field) is not None:
if row.get(field) in [0, ""]:
row[field] = None
else:
# format datetime as hubspot standard ex. 2024-04-24T20:20:53.386Z
dt_field = datetime.fromtimestamp(int(row[field]) / 1000)
row[field] = dt_field.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
row["updatedAt"] = row["hs_lastmodifieddate"]
row["createdAt"] = row["createdate"]
row["archived"] = False
row = self.process_row_types(row)
return row

def request_records(self, context: Optional[dict]) -> Iterable[dict]:
next_page_token: Any = None
finished = False
decorated_request = self.request_decorator(self._request)

while not finished:
prepared_request = self.prepare_request(
context, next_page_token=next_page_token
)
# only use fullsync_companies in the first sync
if self.name == "fullsync_companies" and not self.is_first_sync():
finished = True
yield from []
break
resp = decorated_request(prepared_request, context)
yield from self.parse_response(resp)
previous_token = copy.deepcopy(next_page_token)
next_page_token = self.get_next_page_token(
response=resp, previous_token=previous_token
)
if next_page_token and next_page_token == previous_token:
raise RuntimeError(
f"Loop detected in pagination. "
f"Pagination token {next_page_token} is identical to prior token."
)
# Cycle until get_next_page_token() no longer returns a value
finished = not next_page_token
44 changes: 42 additions & 2 deletions tap_hubspot_beta/client_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

from tap_hubspot_beta.client_base import hubspotStream
from pendulum import parse
from singer_sdk import typing as th
import singer


from singer_sdk.exceptions import InvalidStreamSortException
from singer_sdk.helpers._state import (
Expand Down Expand Up @@ -111,12 +114,16 @@ def post_process(self, row: dict, context: Optional[dict]) -> dict:
for name, value in row["properties"].items():
row[name] = value
del row["properties"]
# store archived value in _hg_archived
row["_hg_archived"] = False
row = self.process_row_types(row)
return row

def _sync_records( # noqa C901 # too complex
self, context: Optional[dict] = None
) -> None:
"""Sync records, emitting RECORD and STATE messages. """

record_count = 0
current_context: Optional[dict]
context_list: Optional[List[dict]]
Expand Down Expand Up @@ -187,6 +194,16 @@ def _sync_records( # noqa C901 # too complex
# Reset interim bookmarks before emitting final STATE message:
self._write_state_message()


def _sync_children(self, child_context: dict) -> None:
for child_stream in self.child_streams:
if child_stream.selected or child_stream.has_selected_descendents:
if not child_stream.bulk_child:
ids = child_context.get("ids") or []
for id in ids:
child_stream.sync(context=id)
else:
child_stream.sync(context=child_context)

class hubspotV3Stream(hubspotStream):
"""hubspot stream class."""
Expand All @@ -209,7 +226,12 @@ def get_url_params(
params["limit"] = self.page_size
params.update(self.additional_prarams)
if self.properties_url:
params["properties"] = ",".join(self.selected_properties)
# requesting either properties or properties with history
# if we send both it returns an error saying the url is too long
if params.get("propertiesWithHistory"):
params["propertiesWithHistory"] = ",".join(self.selected_properties)
else:
params["properties"] = ",".join(self.selected_properties)
if next_page_token:
params["after"] = next_page_token
return params
Expand All @@ -220,6 +242,7 @@ def post_process(self, row: dict, context: Optional[dict]) -> dict:
for name, value in row["properties"].items():
row[name] = value
del row["properties"]
row = self.process_row_types(row)
return row


Expand Down Expand Up @@ -275,4 +298,21 @@ def post_process(self, row: dict, context: Optional[dict]) -> dict:
for name, value in row["properties"].items():
row[name] = value
del row["properties"]
return row
row = self.process_row_types(row)
return row

class hubspotHistoryV3Stream(hubspotV3Stream):

def post_process(self, row: dict, context) -> dict:
row = super().post_process(row, context)
props = row.get("propertiesWithHistory") or dict()
row["propertiesWithHistory"] = {k:v for (k,v) in props.items() if v}
row = {k:v for k,v in row.items() if k in ["id", "propertiesWithHistory", "createdAt", "updatedAt", "archived", "archivedAt"]}
return row

def _write_schema_message(self) -> None:
"""Write out a SCHEMA message with the stream schema."""
for schema_message in self._generate_schema_messages():
schema_message.schema = th.PropertiesList(*self.base_properties).to_dict()
singer.write_message(schema_message)

1 change: 1 addition & 0 deletions tap_hubspot_beta/client_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class hubspotV4Stream(hubspotStream):

rest_method = "POST"
records_jsonpath = "$.results[*]"
bulk_child = True

def get_url(self, context: Optional[dict]) -> str:
"""Get stream entity URL. """
Expand Down
Loading