Skip to content

Commit

Permalink
Add more exception handled, as observed by restarting webapi while se…
Browse files Browse the repository at this point in the history
…nding a batch (#44)
  • Loading branch information
pavius authored Jul 23, 2020
1 parent 62ac862 commit 64cbce7
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 7 deletions.
75 changes: 74 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os.path
import unittest
import sys
import time

import future.utils

Expand Down Expand Up @@ -643,3 +643,76 @@ def _object_contents(idx):
self.assertEqual(404, response.status_code)
else:
self.assertEqual(200, response.status_code)


class TestConnectonErrorRecovery(Test):

def setUp(self):
super(TestConnectonErrorRecovery, self).setUp()

self._object_dir = '/v3io-py-test-connection-error'
self._object_path = self._object_dir + '/object.txt'

self._emd_path = 'some_dir/v3io-py-test-emd'
self._delete_dir(self._emd_path)

# clean up
self._delete_dir(self._object_dir)

@unittest.skip("Manually executed")
def test_object(self):

for i in range(100):
body = 'iteration {}'.format(i)

if i == 10:
self._restart_webapi()

# put contents to some object
self._client.put_object(container=self._container,
path=self._object_path,
body=body)

response = self._client.get_object(container=self._container,
path=self._object_path)

if not isinstance(response.body, str):
response.body = response.body.decode('utf-8')

self.assertEqual(response.body, body)

time.sleep(0.1)

@unittest.skip("Manually executed")
def test_emd_batch(self):
items = {
'bob': {'age': 42, 'feature': 'mustache'},
'linda': {'age': 41, 'feature': 'singing'},
'louise': {'age': 9, 'feature': 'bunny ears'},
'tina': {'age': 14, 'feature': 'butts'},
}

# put the item in a batch
for item_key, item_attributes in future.utils.viewitems(items):
self._client.batch.put_item(container=self._container,
path=v3io.common.helpers.url_join(self._emd_path, item_key),
attributes=item_attributes)

responses = self._client.batch.wait()
for response in responses:
self.assertEqual(200, response.status_code)

self._restart_webapi()

for item_key in items.keys():
self._client.batch.get_item(container=self._container,
path=v3io.common.helpers.url_join(self._emd_path, item_key),
attribute_names=['__size', 'age'])

responses = self._client.batch.wait()
for response in responses:
self.assertEqual(200, response.status_code)

def _restart_webapi(self):
print('Restart webapi now')
time.sleep(15)
31 changes: 25 additions & 6 deletions v3io/dataplane/transport/httpclient.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import ssl
import sys
import http.client
import socket

import v3io.dataplane.response
import v3io.dataplane.request
Expand All @@ -25,12 +26,12 @@ def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, ve

# python 2 and 3 have different exceptions
if sys.version_info[0] >= 3:
self._remote_disconnect_exception = http.client.RemoteDisconnected
self._disconnection_exceptions = (BrokenPipeError, http.client.CannotSendRequest, http.client.RemoteDisconnected)
self._wait_response_exceptions = (http.client.RemoteDisconnected, ConnectionResetError, ConnectionRefusedError)
self._send_request_exceptions = (BrokenPipeError, http.client.CannotSendRequest, http.client.RemoteDisconnected)
self._get_status_and_headers = self._get_status_and_headers_py3
else:
self._remote_disconnect_exception = http.client.BadStatusLine
self._disconnection_exceptions = (http.client.CannotSendRequest, http.client.BadStatusLine)
self._wait_response_exceptions = (http.client.BadStatusLine, socket.error)
self._send_request_exceptions = (http.client.CannotSendRequest, http.client.BadStatusLine)
self._get_status_and_headers = self._get_status_and_headers_py2

def restart(self):
Expand Down Expand Up @@ -86,17 +87,30 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
# return the response
return response

except self._remote_disconnect_exception as e:
except self._wait_response_exceptions as e:
if num_retries == 0:
self._logger.warn_with('Remote disconnected while waiting for response and ran out of retries',
e=type(e),
connection_idx=connection_idx)

raise e

self._logger.info_with('Remote disconnected while waiting for response',
retries_left=num_retries,
connection_idx=connection_idx)

num_retries -= 1

# create a connection
connection = self._recreate_connection_at_index(connection_idx)

# re-send the request on the connection
request = self._send_request_on_connection(request, connection_idx)
except BaseException as e:
self._logger.warn_with('Unhandled exception while waiting for response',
e=type(e),
connection_idx=connection_idx)
raise e

def _send_request_on_connection(self, request, connection_idx):
self.log('Tx',
Expand All @@ -110,11 +124,16 @@ def _send_request_on_connection(self, request, connection_idx):

try:
connection.request(request.method, request.path, request.body, request.headers)
except self._disconnection_exceptions:
except self._send_request_exceptions as e:
self._logger.info_with('Disconnected while attempting to send. Recreating connection', e=type(e))

connection = self._recreate_connection_at_index(connection_idx)

# re-request
connection.request(request.method, request.path, request.body, request.headers)
except BaseException as e:
self._logger.warn_with('Unhandled exception while sending request', e=type(e))
raise e

return request

Expand Down

0 comments on commit 64cbce7

Please sign in to comment.