diff --git a/tests/test_client.py b/tests/test_client.py index 9b71bff..c8242eb 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,6 +1,6 @@ import os.path import unittest -import sys +import time import future.utils @@ -643,3 +643,76 @@ def _object_contents(idx): self.assertEqual(404, response.status_code) else: self.assertEqual(200, response.status_code) + + +class TestConnectonErrorRecovery(Test): + + def setUp(self): + super(TestConnectonErrorRecovery, self).setUp() + + self._object_dir = '/v3io-py-test-connection-error' + self._object_path = self._object_dir + '/object.txt' + + self._emd_path = 'some_dir/v3io-py-test-emd' + self._delete_dir(self._emd_path) + + # clean up + self._delete_dir(self._object_dir) + + @unittest.skip("Manually executed") + def test_object(self): + + for i in range(100): + body = 'iteration {}'.format(i) + + if i == 10: + self._restart_webapi() + + # put contents to some object + self._client.put_object(container=self._container, + path=self._object_path, + body=body) + + response = self._client.get_object(container=self._container, + path=self._object_path) + + if not isinstance(response.body, str): + response.body = response.body.decode('utf-8') + + self.assertEqual(response.body, body) + + time.sleep(0.1) + + @unittest.skip("Manually executed") + def test_emd_batch(self): + items = { + 'bob': {'age': 42, 'feature': 'mustache'}, + 'linda': {'age': 41, 'feature': 'singing'}, + 'louise': {'age': 9, 'feature': 'bunny ears'}, + 'tina': {'age': 14, 'feature': 'butts'}, + } + + # put the item in a batch + for item_key, item_attributes in future.utils.viewitems(items): + self._client.batch.put_item(container=self._container, + path=v3io.common.helpers.url_join(self._emd_path, item_key), + attributes=item_attributes) + + responses = self._client.batch.wait() + for response in responses: + self.assertEqual(200, response.status_code) + + self._restart_webapi() + + for item_key in items.keys(): + self._client.batch.get_item(container=self._container, + path=v3io.common.helpers.url_join(self._emd_path, item_key), + attribute_names=['__size', 'age']) + + responses = self._client.batch.wait() + for response in responses: + self.assertEqual(200, response.status_code) + + def _restart_webapi(self): + print('Restart webapi now') + time.sleep(15) diff --git a/v3io/dataplane/transport/httpclient.py b/v3io/dataplane/transport/httpclient.py index 0e8a0d4..be2fb7d 100644 --- a/v3io/dataplane/transport/httpclient.py +++ b/v3io/dataplane/transport/httpclient.py @@ -1,6 +1,7 @@ import ssl import sys import http.client +import socket import v3io.dataplane.response import v3io.dataplane.request @@ -25,12 +26,12 @@ def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, ve # python 2 and 3 have different exceptions if sys.version_info[0] >= 3: - self._remote_disconnect_exception = http.client.RemoteDisconnected - self._disconnection_exceptions = (BrokenPipeError, http.client.CannotSendRequest, http.client.RemoteDisconnected) + self._wait_response_exceptions = (http.client.RemoteDisconnected, ConnectionResetError, ConnectionRefusedError) + self._send_request_exceptions = (BrokenPipeError, http.client.CannotSendRequest, http.client.RemoteDisconnected) self._get_status_and_headers = self._get_status_and_headers_py3 else: - self._remote_disconnect_exception = http.client.BadStatusLine - self._disconnection_exceptions = (http.client.CannotSendRequest, http.client.BadStatusLine) + self._wait_response_exceptions = (http.client.BadStatusLine, socket.error) + self._send_request_exceptions = (http.client.CannotSendRequest, http.client.BadStatusLine) self._get_status_and_headers = self._get_status_and_headers_py2 def restart(self): @@ -86,10 +87,18 @@ def wait_response(self, request, raise_for_status=None, num_retries=1): # return the response return response - except self._remote_disconnect_exception as e: + except self._wait_response_exceptions as e: if num_retries == 0: + self._logger.warn_with('Remote disconnected while waiting for response and ran out of retries', + e=type(e), + connection_idx=connection_idx) + raise e + self._logger.info_with('Remote disconnected while waiting for response', + retries_left=num_retries, + connection_idx=connection_idx) + num_retries -= 1 # create a connection @@ -97,6 +106,11 @@ def wait_response(self, request, raise_for_status=None, num_retries=1): # re-send the request on the connection request = self._send_request_on_connection(request, connection_idx) + except BaseException as e: + self._logger.warn_with('Unhandled exception while waiting for response', + e=type(e), + connection_idx=connection_idx) + raise e def _send_request_on_connection(self, request, connection_idx): self.log('Tx', @@ -110,11 +124,16 @@ def _send_request_on_connection(self, request, connection_idx): try: connection.request(request.method, request.path, request.body, request.headers) - except self._disconnection_exceptions: + except self._send_request_exceptions as e: + self._logger.info_with('Disconnected while attempting to send. Recreating connection', e=type(e)) + connection = self._recreate_connection_at_index(connection_idx) # re-request connection.request(request.method, request.path, request.body, request.headers) + except BaseException as e: + self._logger.warn_with('Unhandled exception while sending request', e=type(e)) + raise e return request