Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds new backoff generator and logging #13

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ python = "<3.11,>=3.7.1"
requests = "^2.25.1"
singer-sdk = "^0.4.4"
"backports.cached-property" = "^1.0.1"
curlify = "^2.2.1"

[tool.poetry.dev-dependencies]
pytest = "^6.2.5"
Expand Down
46 changes: 42 additions & 4 deletions tap_hubspot_beta/client_base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""REST client handling, including hubspotStream base class."""
import copy
import logging
import curlify

import requests
import backoff
Expand Down Expand Up @@ -137,21 +138,51 @@ def selected_properties(self):
selected_properties.append(key[-1])
return selected_properties

def log_rate_limit(self, resp):
"""
Prints out the content for the rate limits headers in the response.
"""
for header in [
'x-hubspot-ratelimit-interval-milliseconds',
'x-hubspot-ratelimit-max',
'x-hubspot-ratelimit-remaining',
'x-hubspot-ratelimit-secondly',
'x-hubspot-ratelimit-secondly-remaining',
]:
self.logger.info("Header: {}, value: {}".format(
header,
resp.headers.get(header)
))
self.logger.info("429 response from path: {} - {}".format(
resp.url,
resp.content
))

def validate_response(self, response: requests.Response) -> None:
"""Validate HTTP response."""
if 500 <= response.status_code < 600 or response.status_code in [429, 401, 104]:
if 500 <= response.status_code < 600 or response.status_code in [400, 401, 104]:
msg = (
f"{response.status_code} Server Error: "
f"{response.reason} for path: {self.path}"
)
raise RetriableAPIError(msg)
curl_command = curlify.to_curl(response.request)
logging.error(f"Response code: {response.status_code}, info: {response.text}")
logging.error(f"CURL command for failed request: {curl_command}")
raise RetriableAPIError(f"Msg {msg}, response {response.text}")

if 429 == response.status_code:
self.log_rate_limit(response)
hsyyid marked this conversation as resolved.
Show resolved Hide resolved
raise RetriableAPIError(f"429 Too Many Requests, response {response.text}")

elif 400 <= response.status_code < 500:
elif 400 < response.status_code < 500:
msg = (
f"{response.status_code} Client Error: "
f"{response.reason} for path: {self.path}"
)
raise FatalAPIError(msg)
curl_command = curlify.to_curl(response.request)
logging.error(f"Response code: {response.status_code}, info: {response.text}")
logging.error(f"CURL command for failed request: {curl_command}")
raise FatalAPIError(RetriableAPIError(f"Msg {msg}, response {response.text}"))

@staticmethod
def extract_type(field):
Expand Down Expand Up @@ -248,6 +279,13 @@ def finalize_state_progress_markers(stream_or_partition_state: dict) -> Optional
return
finalize_state_progress_markers(state)

def backoff_wait_generator(self):
return backoff.constant(interval=15)

@property
def backoff_max_tries(self):
return 10

def request_decorator(self, func):
"""Instantiate a decorator for handling request failures."""
decorator = backoff.on_exception(
Expand Down
24 changes: 12 additions & 12 deletions tap_hubspot_beta/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -1054,17 +1054,17 @@ def replication_key(self):
return "updatedAt"

schema = th.PropertiesList(
th.Property("listId", th.NumberType()),
th.Property("listVersion", th.NumberType()),
th.Property("createdAt", th.DateTimeType()),
th.Property("updatedAt", th.DateTimeType()),
th.Property("filtersUpdateAt", th.DateTimeType()),
th.Property("processingStatus", th.StringType()),
th.Property("createdById", th.NumberType()),
th.Property("updatedById", th.NumberType()),
th.Property("processingType", th.StringType()),
th.Property("objectTypeId", th.StringType()),
th.Property("name", th.StringType()),
th.Property("listId", th.StringType),
th.Property("listVersion", th.NumberType),
th.Property("createdAt", th.DateTimeType),
th.Property("updatedAt", th.DateTimeType),
th.Property("filtersUpdateAt", th.DateTimeType),
th.Property("processingStatus", th.StringType),
th.Property("createdById", th.StringType),
th.Property("updatedById", th.StringType),
th.Property("processingType", th.StringType),
th.Property("objectTypeId", th.StringType),
th.Property("name", th.StringType),
th.Property("additionalProperties", th.CustomType({"type": ["object", "string"]})),
).to_dict()

Expand Down Expand Up @@ -1095,7 +1095,7 @@ class ListMembershipV3Stream(hubspotV3Stream):

schema = th.PropertiesList(
th.Property("results", th.CustomType({"type": ["array", "string"]})),
th.Property("list_id", th.IntegerType),
th.Property("list_id", th.StringType),
).to_dict()

def post_process(self, row, context):
Expand Down