Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parameter splitting into multiple calls with stitching the responses back #93

Open
wants to merge 9 commits into
base: HGI-6444
Choose a base branch
from
101 changes: 89 additions & 12 deletions tap_hubspot_beta/client_base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""REST client handling, including hubspotStream base class."""
import copy
import json
import logging

import requests
import backoff
from copy import deepcopy
from typing import Any, Dict, Optional, cast, List
from typing import Any, Dict, Optional, Union, cast, List
from backports.cached_property import cached_property
from singer_sdk import typing as th
from singer_sdk.exceptions import FatalAPIError, RetriableAPIError
Expand Down Expand Up @@ -84,6 +85,68 @@ def last_job(self):
return parse(last_job.get("value"))
return

def prepare_request(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Union[requests.PreparedRequest, List[requests.PreparedRequest]]:
http_method: str = self.rest_method
url: str = self.get_url(context)
params: Dict[str, Any] = self.get_url_params(context, next_page_token)
request_data: Optional[dict] = self.prepare_request_payload(context, next_page_token)
headers: Dict[str, str] = self.http_headers

authenticator = self.authenticator
if authenticator:
headers.update(authenticator.auth_headers or {})
params.update(authenticator.auth_params or {})

if "propertiesWithHistory" in params or "properties" in params:
properties_key: str = "propertiesWithHistory" if "propertiesWithHistory" in params else "properties"
if isinstance(params[properties_key], list):
requests_list: List[requests.PreparedRequest] = [
self.requests_session.prepare_request(
requests.Request(
method=http_method,
url=url,
params={**params, properties_key: chunk},
headers=headers,
json=request_data,
)
)
for chunk in params[properties_key]
]
return requests_list
request: requests.PreparedRequest = cast(
requests.PreparedRequest,
self.requests_session.prepare_request(
requests.Request(
method=http_method,
url=url,
params=params,
headers=headers,
json=request_data,
),
),
)
return request

def stitch_responses(self, responses: List[requests.Response]) -> requests.Response:
"""Stitch together responses from a list of requests."""
if not responses:
raise ValueError("No responses to stitch")

stitched_response = responses[0]
stitched_response_json = stitched_response.json()

for response in responses[1:]:
response_json = response.json()
for key in response_json.keys() & {"properties", "propertiesWithHistory"}:
stitched_response_json[key].update(response_json[key])
# Clear the response content to free memory
response._content = None

stitched_response._content = json.dumps(stitched_response_json).encode('utf-8')
return stitched_response

def request_records(self, context):
"""Request records from REST endpoint(s), returning response records."""
next_page_token = None
Expand All @@ -95,7 +158,14 @@ def request_records(self, context):
prepared_request = self.prepare_request(
context, next_page_token=next_page_token
)
resp = decorated_request(prepared_request, context)
if prepared_request:
if isinstance(prepared_request, list):
resp = self.get_multiple_requests_by_parameter_splitting(context, decorated_request, prepared_request)
else:
resp = decorated_request(prepared_request, context)
self.validate_response(resp)
else:
raise RuntimeError(f"No response from {self.name} stream")
for row in self.parse_response(resp):
yield row
previous_token = copy.deepcopy(next_page_token)
Expand All @@ -109,6 +179,17 @@ def request_records(self, context):
)
finished = not next_page_token

def get_multiple_requests_by_parameter_splitting(self, context, decorated_request, prepared_request):
responses_list: List[requests.Response] = []
for req in prepared_request:
response = decorated_request(req, context)
self.validate_response(response)
responses_list.append(response)
stitched_response = self.stitch_responses(responses_list)
# Clear the responses list to free memory
responses_list.clear()
return stitched_response

@property
def authenticator(self) -> OAuth2Authenticator:
"""Return a new authenticator object."""
Expand Down Expand Up @@ -178,7 +259,9 @@ def validate_response(self, response: requests.Response) -> None:
curl_command = curlify.to_curl(response.request)
logging.error(f"Response code: {response.status_code}, info: {response.text}")
logging.error(f"CURL command for failed request: {curl_command}")
raise RetriableAPIError(msg)
logging.error("Sleeping for 60 seconds before retrying...")
time.sleep(60)
butkeraites-hotglue marked this conversation as resolved.
Show resolved Hide resolved
raise RetriableAPIError(msg)

elif 400 <= response.status_code < 500:
msg = (
Expand Down Expand Up @@ -294,14 +377,15 @@ def finalize_state_progress_markers(stream_or_partition_state: dict) -> Optional
def request_decorator(self, func):
"""Instantiate a decorator for handling request failures."""
decorator = backoff.on_exception(
self.backoff_wait_generator,
backoff.expo,
(
RetriableAPIError,
requests.exceptions.ReadTimeout,
requests.exceptions.ConnectionError,
ProtocolError
),
max_tries=self.backoff_max_tries,
max_tries=8,
factor=3,
on_backoff=self.backoff_handler,
)(func)
return decorator
Expand Down Expand Up @@ -369,10 +453,3 @@ def get_url_params(
params.update(next_page_token)
return params

def backoff_wait_generator(self):
"""The wait generator used by the backoff decorator on request failure. """
return backoff.expo(factor=3)

def backoff_max_tries(self) -> int:
"""The number of attempts before giving up when retrying requests."""
return 8
4 changes: 2 additions & 2 deletions tap_hubspot_beta/client_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ def get_url_params(

def post_process(self, row: dict, context: Optional[dict]) -> dict:
"""As needed, append or transform raw data to match expected structure."""
if self.properties_url:
if self.properties_url and row:
if row.get("properties"):
for name, value in row.get("properties", {}).items():
row[name] = value.get("value")
del row["properties"]
for field in self.datetime_fields:
if row.get(field) is not None:
if row and row.get(field) is not None:
if row.get(field) in [0, ""]:
row[field] = None
else:
Expand Down
9 changes: 5 additions & 4 deletions tap_hubspot_beta/client_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,11 @@ def get_url_params(
if self.properties_url:
# requesting either properties or properties with history
# if we send both it returns an error saying the url is too long
if params.get("propertiesWithHistory"):
params["propertiesWithHistory"] = ",".join(self.selected_properties)
else:
params["properties"] = ",".join(self.selected_properties)
properties_key = "propertiesWithHistory" if params.get("propertiesWithHistory") else "properties"
params[properties_key] = [
",".join(self.selected_properties[i:i + 100])
for i in range(0, len(self.selected_properties), 100)
]
if next_page_token:
params["after"] = next_page_token
if self.name == "forms":
Expand Down
57 changes: 49 additions & 8 deletions tap_hubspot_beta/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
return {
"form_id": record["id"],
"formType": record["formType"],
}


Expand All @@ -574,10 +575,21 @@ class FormSubmissionsStream(hubspotV1Stream):
th.Property("submittedAt", th.DateTimeType),
).to_dict()

def request_records(self, context):
# https://developers.hubspot.com/beta-docs/reference/api/marketing/forms/v1
# this endpoint only works for the formType bellow
if context.get("formType", "").lower() in ["hubspot", "flow", "captured"]:
records = super().request_records(context)
yield from records
else:
self.logger.warning(f"Skiping submissions for form_id={context.get('form_id')}. formType={context.get('formType')}. url={self.get_url(context)}")
yield None

def post_process(self, row: dict, context: Optional[dict]) -> dict:
"""As needed, append or transform raw data to match expected structure."""
row = super().post_process(row, context)
row["form_id"] = context.get("form_id")
if row:
row["form_id"] = context.get("form_id")
return row


Expand Down Expand Up @@ -906,9 +918,9 @@ def metadata(self):
return new_metadata

def get_url_params(self, context, next_page_token):
fields = "id,createdAt,updatedAt,archived,archivedAt"
params = super().get_url_params(context, next_page_token)
if len(urlencode(params)) > 3000:
params["properties"] = "id,createdAt,updatedAt,archived,archivedAt"
params = get_url_params_properties_if_too_long(params, fields)
return params

def post_process(self, row, context):
Expand Down Expand Up @@ -1001,9 +1013,9 @@ class ArchivedDealsStream(hubspotV3Stream):
]

def get_url_params(self, context, next_page_token):
fields = "id,createdAt,updatedAt,archivedAt,dealname,hubspot_owner_id,amount,hs_mrr,dealstage,pipeline,dealtype,hs_createdate,createdate,hs_lastmodifieddate,closedate,archived"
params = super().get_url_params(context, next_page_token)
if len(urlencode(params)) > 3000:
params["properties"] = "id,createdAt,updatedAt,archivedAt,dealname,hubspot_owner_id,amount,hs_mrr,dealstage,pipeline,dealtype,hs_createdate,createdate,hs_lastmodifieddate,closedate,archived"
params = get_url_params_properties_if_too_long(params, fields)
return params

@property
Expand Down Expand Up @@ -1184,9 +1196,9 @@ def _write_record_message(self, record: dict) -> None:
singer.write_message(record_message)

def get_url_params(self, context, next_page_token):
fields = "id,createdAt,updatedAt,archived,archivedAt"
params = super().get_url_params(context, next_page_token)
if len(urlencode(params)) > 3000:
params["properties"] = "id,createdAt,updatedAt,archived,archivedAt"
params = get_url_params_properties_if_too_long(params, fields)
return params

def post_process(self, row, context):
Expand Down Expand Up @@ -2531,4 +2543,33 @@ class GeolocationSummaryMonthlyStream(FormsSummaryMonthlyStream):
th.Property("breakdowns", th.CustomType({"type": ["array", "string"]})),
th.Property("start_date", th.DateType),
th.Property("end_date", th.DateType),
).to_dict()
).to_dict()

def get_url_params_properties_if_too_long(params, fields):
def is_url_too_long(params):
return len(urlencode(params)) > 3000

# Determine the key to use for properties
properties_key = None
for key in ["propertiesWithHistory", "properties"]:
if key in params:
properties_key = key
break

# If a properties key is found
if properties_key:
# If the properties value is a list, check each chunk as if it was a separate request
if isinstance(params[properties_key], list):
for chunk in params[properties_key]:
params_chunk = {**params, properties_key: chunk}
if is_url_too_long(params_chunk):
params[properties_key] = fields
return params
# If the URL is too long, set the properties to the provided fields
elif is_url_too_long(params):
params[properties_key] = fields
# If no properties key is found and the URL is too long, set the properties to the provided fields
elif is_url_too_long(params):
params["properties"] = fields

return params
7 changes: 4 additions & 3 deletions tap_hubspot_beta/tap.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""hubspot tap class."""

from typing import List
from typing import Generator, List

from singer_sdk import Stream, Tap
from singer_sdk import typing as th
Expand Down Expand Up @@ -222,9 +222,10 @@ def __init__(
th.Property("access_token", th.StringType),
).to_dict()

def discover_streams(self) -> List[Stream]:
def discover_streams(self) -> Generator[Stream, None, None]:
"""Return a list of discovered streams."""
return [stream_class(tap=self) for stream_class in STREAM_TYPES]
for stream_class in STREAM_TYPES:
yield stream_class(tap=self)

@property
def catalog_dict(self) -> dict:
Expand Down