From 038acbebeac4eea440f984d6fbebb27c8eec15ea Mon Sep 17 00:00:00 2001 From: Kethan Cherukuri <105211331+kethan1122@users.noreply.github.com> Date: Mon, 5 Jun 2023 13:08:48 +0530 Subject: [PATCH] Backoff for Protocol error and chunked encoding errors (#131) * backoff protocol error and chunked encoding errors * add comments * changelog and version bump * minor to patch * refator http.py for backoffs * use imported connectonerror instead of builtin * add backoff for requests_metrics_path --- .circleci/config.yml | 2 +- CHANGELOG.md | 2 ++ setup.py | 2 +- tap_zendesk/__init__.py | 35 ++++++++++++++++++++++++++++------ tap_zendesk/http.py | 24 ++++++++--------------- test/unittests/test_http.py | 38 +++++++++++++++++++++++++++++++++++++ 6 files changed, 79 insertions(+), 24 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 7ee07db..22badd5 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -33,7 +33,7 @@ jobs: name: 'pylint' command: | source /usr/local/share/virtualenvs/tap-zendesk/bin/activate - pylint tap_zendesk -d missing-docstring,invalid-name,line-too-long,too-many-locals,too-few-public-methods,fixme,stop-iteration-return,too-many-branches,useless-import-alias,no-else-return,logging-not-lazy + pylint tap_zendesk -d missing-docstring,invalid-name,line-too-long,too-many-locals,too-few-public-methods,fixme,stop-iteration-return,too-many-branches,useless-import-alias,no-else-return,logging-not-lazy,redefined-builtin - run: name: 'unittests' when: always diff --git a/CHANGELOG.md b/CHANGELOG.md index a356b6f..7c6f0db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ # Changelog +## 2.0.1 + * Adds backoff/retry for `ProtocolError` and `ChunkedEncodingError` [#131](https://github.com/singer-io/tap-zendesk/pull/131) ## 2.0.0 * Incremental Exports API implementation for User's stream [#127](https://github.com/singer-io/tap-zendesk/pull/127) ## 1.7.6 diff --git a/setup.py b/setup.py index c2b4886..9d2a9c7 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='tap-zendesk', - version='2.0.0', + version='2.0.1', description='Singer.io tap for extracting data from the Zendesk API', author='Stitch', url='https://singer.io', diff --git a/tap_zendesk/__init__.py b/tap_zendesk/__init__.py index 4eeb195..44275bc 100755 --- a/tap_zendesk/__init__.py +++ b/tap_zendesk/__init__.py @@ -6,8 +6,11 @@ import requests from requests import Session from requests.adapters import HTTPAdapter +from requests.exceptions import Timeout, ChunkedEncodingError +from urllib3.exceptions import ProtocolError import singer from singer import metadata, metrics as singer_metrics +import backoff from tap_zendesk import metrics as zendesk_metrics from tap_zendesk.discover import discover_streams from tap_zendesk.streams import STREAMS @@ -36,6 +39,12 @@ # patch Session.request to record HTTP request metrics request = Session.request + +@backoff.on_exception(backoff.expo, + (ConnectionError, ConnectionResetError, Timeout, ChunkedEncodingError, + ProtocolError), + max_tries=5, + factor=2) def request_metrics_patch(self, method, url, **kwargs): with singer_metrics.http_request_timer(None): response = request(self, method, url, **kwargs) @@ -45,7 +54,10 @@ def request_metrics_patch(self, method, url, **kwargs): response.headers.get('X-Request-Id', 'Not present')) return response + Session.request = request_metrics_patch + + # end patch def do_discover(client, config): @@ -54,9 +66,11 @@ def do_discover(client, config): json.dump(catalog, sys.stdout, indent=2) LOGGER.info("Finished discover") + def stream_is_selected(mdata): return mdata.get((), {}).get('selected', False) + def get_selected_streams(catalog): selected_stream_names = [] for stream in catalog.streams: @@ -70,15 +84,18 @@ def get_selected_streams(catalog): 'tickets': ['ticket_audits', 'ticket_metrics', 'ticket_comments'] } + def get_sub_stream_names(): sub_stream_names = [] for parent_stream in SUB_STREAMS: sub_stream_names.extend(SUB_STREAMS[parent_stream]) return sub_stream_names + class DependencyException(Exception): pass + def validate_dependencies(selected_stream_ids): errs = [] msg_tmpl = ("Unable to extract {0} data. " @@ -92,13 +109,14 @@ def validate_dependencies(selected_stream_ids): if errs: raise DependencyException(" ".join(errs)) + def populate_class_schemas(catalog, selected_stream_names): for stream in catalog.streams: if stream.tap_stream_id in selected_stream_names: STREAMS[stream.tap_stream_id].stream = stream -def do_sync(client, catalog, state, config): +def do_sync(client, catalog, state, config): selected_stream_names = get_selected_streams(catalog) validate_dependencies(selected_stream_names) populate_class_schemas(catalog, selected_stream_names) @@ -121,7 +139,6 @@ def do_sync(client, catalog, state, config): # else: # LOGGER.info("%s: Starting", stream_name) - key_properties = metadata.get(mdata, (), 'table-key-properties') singer.write_schema(stream_name, stream.schema.to_dict(), key_properties) @@ -133,7 +150,8 @@ def do_sync(client, catalog, state, config): sub_stream = STREAMS[sub_stream_name].stream sub_mdata = metadata.to_map(sub_stream.metadata) sub_key_properties = metadata.get(sub_mdata, (), 'table-key-properties') - singer.write_schema(sub_stream.tap_stream_id, sub_stream.schema.to_dict(), sub_key_properties) + singer.write_schema(sub_stream.tap_stream_id, sub_stream.schema.to_dict(), + sub_key_properties) # parent stream will sync sub stream if stream_name in all_sub_stream_names: @@ -150,6 +168,7 @@ def do_sync(client, catalog, state, config): LOGGER.info("Finished sync") zendesk_metrics.log_aggregate_rates() + def oauth_auth(args): if not set(OAUTH_CONFIG_KEYS).issubset(args.config.keys()): LOGGER.debug("OAuth authentication unavailable.") @@ -161,6 +180,7 @@ def oauth_auth(args): "oauth_token": args.config['access_token'], } + def api_token_auth(args): if not set(API_TOKEN_CONFIG_KEYS).issubset(args.config.keys()): LOGGER.debug("API Token authentication unavailable.") @@ -173,6 +193,7 @@ def api_token_auth(args): "token": args.config['api_token'] } + def get_session(config): """ Add partner information to requests Session object if specified in the config. """ if not all(k in config for k in ["marketplace_name", @@ -184,10 +205,12 @@ def get_session(config): # https://github.com/facetoe/zenpy/blob/master/docs/zenpy.rst#usage session.mount("https://", HTTPAdapter(**Zenpy.http_adapter_kwargs())) session.headers["X-Zendesk-Marketplace-Name"] = config.get("marketplace_name", "") - session.headers["X-Zendesk-Marketplace-Organization-Id"] = str(config.get("marketplace_organization_id", "")) + session.headers["X-Zendesk-Marketplace-Organization-Id"] = str( + config.get("marketplace_organization_id", "")) session.headers["X-Zendesk-Marketplace-App-Id"] = str(config.get("marketplace_app_id", "")) return session + @singer.utils.handle_top_exception(LOGGER) def main(): parsed_args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS) @@ -197,11 +220,11 @@ def main(): if config_request_timeout and float(config_request_timeout): request_timeout = float(config_request_timeout) else: - request_timeout = REQUEST_TIMEOUT # If value is 0, "0", "" or not passed then it sets default to 300 seconds. + request_timeout = REQUEST_TIMEOUT # If value is 0, "0", "" or not passed then it sets default to 300 seconds. # OAuth has precedence creds = oauth_auth(parsed_args) or api_token_auth(parsed_args) session = get_session(parsed_args.config) - client = Zenpy(session=session, timeout=request_timeout, **creds) # Pass request timeout + client = Zenpy(session=session, timeout=request_timeout, **creds) # Pass request timeout if not client: LOGGER.error("""No suitable authentication keys provided.""") diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index 2ad9666..3f44006 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -2,7 +2,8 @@ import backoff import requests import singer -from requests.exceptions import Timeout, HTTPError +from requests.exceptions import Timeout, HTTPError, ChunkedEncodingError, ConnectionError +from urllib3.exceptions import ProtocolError @@ -107,17 +108,11 @@ def is_fatal(exception): sleep(sleep_time) return False - return 400 <=status_code < 500 + if status_code == 409: + # retry ZendeskConflictError for at-least 10 times + return False -def should_retry_error(exception): - """ - Return true if exception is required to retry otherwise return false - """ - if isinstance(exception, ZendeskConflictError): - return True - if isinstance(exception,Exception) and isinstance(exception.args[0][1],ConnectionResetError): - return True - return False + return 400 <=status_code < 500 def raise_for_error(response): """ Error handling method which throws custom error. Class for each error defined above which extends `ZendeskError`. @@ -140,16 +135,13 @@ def raise_for_error(response): response.status_code, {}).get("raise_exception", ZendeskError) raise exc(message, response) from None -@backoff.on_exception(backoff.expo, - (ZendeskConflictError), - max_tries=10, - giveup=lambda e: not should_retry_error(e)) + @backoff.on_exception(backoff.expo, (HTTPError, ZendeskError), # Added support of backoff for all unhandled status codes. max_tries=10, giveup=is_fatal) @backoff.on_exception(backoff.expo, - (ConnectionError, Timeout),#As ConnectionError error and timeout error does not have attribute status_code, + (ConnectionError, ConnectionResetError, Timeout, ChunkedEncodingError, ProtocolError),#As ConnectionError error and timeout error does not have attribute status_code, max_tries=5, # here we added another backoff expression. factor=2) def call_api(url, request_timeout, params, headers): diff --git a/test/unittests/test_http.py b/test/unittests/test_http.py index d2cbe0a..f8d8059 100644 --- a/test/unittests/test_http.py +++ b/test/unittests/test_http.py @@ -2,6 +2,8 @@ from unittest.mock import MagicMock, Mock, patch from tap_zendesk import http, streams import requests +from urllib3.exceptions import ProtocolError +from requests.exceptions import ChunkedEncodingError, ConnectionError import zenpy @@ -479,3 +481,39 @@ def test_get_cursor_based_handles_503(self, mock_get, mock_sleep): # Verify the request retry 10 times self.assertEqual(mock_get.call_count, 10) + + @patch("requests.get") + def test_call_api_handles_protocol_error(self, mock_get, mock_sleep): + """Check whether the request backoff properly for call_api method for 5 times in case of + Protocol error""" + mock_get.side_effect = ProtocolError + + with self.assertRaises(ProtocolError) as _: + http.call_api( + url="some_url", request_timeout=300, params={}, headers={} + ) + self.assertEqual(mock_get.call_count, 5) + + @patch("requests.get") + def test_call_api_handles_chunked_encoding_error(self, mock_get, mock_sleep): + """Check whether the request backoff properly for call_api method for 5 times in case of + ChunkedEncoding error""" + mock_get.side_effect = ChunkedEncodingError + + with self.assertRaises(ChunkedEncodingError) as _: + http.call_api( + url="some_url", request_timeout=300, params={}, headers={} + ) + self.assertEqual(mock_get.call_count, 5) + + @patch("requests.get") + def test_call_api_handles_connection_reset_error(self, mock_get, mock_sleep): + """Check whether the request backoff properly for call_api method for 5 times in case of + ConnectionResetError error""" + mock_get.side_effect = ConnectionResetError + + with self.assertRaises(ConnectionResetError) as _: + http.call_api( + url="some_url", request_timeout=300, params={}, headers={} + ) + self.assertEqual(mock_get.call_count, 5)