Skip to content

Commit

Permalink
use helper functions to simplify tests, #1
Browse files Browse the repository at this point in the history
  • Loading branch information
ThrawnCA committed Sep 16, 2022
1 parent 4bb43bf commit 2502c5d
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 41 deletions.
75 changes: 43 additions & 32 deletions ckanext/archiver/tests/mock_remote_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from builtins import range
from builtins import object
from contextlib import contextmanager
from http.client import responses
from threading import Thread
from time import sleep
from wsgiref.simple_server import make_server
Expand All @@ -15,6 +16,16 @@
from functools import reduce


def _get_str_params(request):
""" Get parameters from the request. If 'str_params' is available,
use that, otherwise just use 'params'.
"""
if hasattr(request, 'str_params'):
return request.str_params
else:
return request.params


class MockHTTPServer(object):
"""
Mock HTTP server that can take the place of a remote server for testing
Expand Down Expand Up @@ -42,7 +53,7 @@ def serve(self, host='localhost', port_range=(8000, 9000)):
This uses context manager to make sure the server is stopped::
>>> with MockTestServer().serve() as addr:
... print urlopen('%s/?content=hello+world').read()
... print(urlopen('%s/?content=hello+world').read())
...
'hello world'
"""
Expand Down Expand Up @@ -112,27 +123,26 @@ class MockEchoTestServer(MockHTTPServer):

def __call__(self, environ, start_response):

from http.client import responses
from webob import Request
request = Request(environ)
status = int(request.str_params.get('status', '200'))
# if 'redirect' in redirect.str_params:
# params = dict([(key, value) for param in request.str_params \
status = int(_get_str_params(request).get('status', '200'))
# if 'redirect' in _get_str_params(redirect):
# params = dict([(key, value) for param in _get_str_params(request) \
# if key != 'redirect'])
# redirect_status = int(request.str_params['redirect'])
# status = int(request.str_params.get('status', '200'))
# redirect_status = int(_get_str_params(request)['redirect'])
# status = int(_get_str_params(request).get('status', '200'))
# resp = make_response(render_template('error.html'), redirect_status)
# resp.headers['Location'] = url_for(request.path, params)
# return resp
if 'content_var' in request.str_params:
content = request.str_params.get('content_var')
if 'content_var' in _get_str_params(request):
content = _get_str_params(request).get('content_var')
content = self.get_content(content)
elif 'content_long' in request.str_params:
elif 'content_long' in _get_str_params(request):
content = '*' * 1000001
else:
content = request.str_params.get('content', '')
if 'method' in request.str_params \
and request.method.lower() != request.str_params['method'].lower():
content = _get_str_params(request).get('content', '')
if 'method' in _get_str_params(request) \
and request.method.lower() != _get_str_params(request)['method'].lower():
content = ''
status = 405

Expand All @@ -141,14 +151,17 @@ def __call__(self, environ, start_response):

headers = [
item
for item in list(request.str_params.items())
for item in _get_str_params(request).items()
if item[0] not in ('content', 'status')
]
if 'length' in request.str_params:
cl = request.str_params.get('length')
if 'length' in _get_str_params(request):
cl = _get_str_params(request).get('length')
headers += [('Content-Length', cl)]
elif content and 'no-content-length' not in request.str_params:
headers += [('Content-Length', bytes(len(content)))]
elif content and 'no-content-length' not in _get_str_params(request):
# Python 2 with old WebOb wants bytes,
# Python 3 with new WebOb wants text,
# so both want 'str'
headers += [('Content-Length', str(len(content)))]
start_response(
'%d %s' % (status, responses[status]),
headers
Expand Down Expand Up @@ -187,20 +200,19 @@ def __init__(self, wms_version='1.3'):
super(MockWmsServer, self).__init__()

def __call__(self, environ, start_response):
from http.client import responses
from webob import Request
request = Request(environ)
status = int(request.str_params.get('status', '200'))
headers = {'Content-Type': 'text/plain'}
status = int(_get_str_params(request).get('status', '200'))
headers = [('Content-Type', 'text/plain')]
# e.g. params ?service=WMS&request=GetCapabilities&version=1.1.1
if request.str_params.get('service') != 'WMS':
if _get_str_params(request).get('service') != 'WMS':
status = 200
content = ERROR_WRONG_SERVICE
elif request.str_params.get('request') != 'GetCapabilities':
elif _get_str_params(request).get('request') != 'GetCapabilities':
status = 405
content = '"request" param wrong'
elif 'version' in request.str_params and \
request.str_params.get('version') != self.wms_version:
elif 'version' in _get_str_params(request) and \
_get_str_params(request).get('version') != self.wms_version:
status = 405
content = '"version" not compatible - need to be %s' % self.wms_version
elif self.wms_version == '1.1.1':
Expand All @@ -211,7 +223,7 @@ def __call__(self, environ, start_response):
content = get_file_content('wms_getcap_1.3.xml')
start_response(
'%d %s' % (status, responses[status]),
list(headers.items())
headers
)
return [content]

Expand All @@ -223,24 +235,23 @@ def __init__(self):
super(MockWfsServer, self).__init__()

def __call__(self, environ, start_response):
from http.client import responses
from webob import Request
request = Request(environ)
status = int(request.str_params.get('status', '200'))
headers = {'Content-Type': 'text/plain'}
status = int(_get_str_params(request).get('status', '200'))
headers = [('Content-Type', 'text/plain')]
# e.g. params ?service=WFS&request=GetCapabilities
if request.str_params.get('service') != 'WFS':
if _get_str_params(request).get('service') != 'WFS':
status = 200
content = ERROR_WRONG_SERVICE
elif request.str_params.get('request') != 'GetCapabilities':
elif _get_str_params(request).get('request') != 'GetCapabilities':
status = 405
content = '"request" param wrong'
else:
status = 200
content = get_file_content('wfs_getcap.xml')
start_response(
'%d %s' % (status, responses[status]),
list(headers.items())
headers
)
return [content]

Expand Down
22 changes: 13 additions & 9 deletions ckanext/archiver/tests/test_archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

from ckan import model
from ckan import plugins
from ckan.logic import get_action
from ckan.tests import factories as ckan_factories

from ckanext.archiver import model as archiver_model
Expand Down Expand Up @@ -187,7 +186,7 @@ def test_bad_url(self):
def test_resource_hash_and_content_length(self, client):
url = client + '/?status=200&content=test&content-type=csv'
res_id = self._test_resource(url)['id']
result = json.loads(update_resource(res_id))
result = self._get_update_resource_json(res_id)
assert result['size'] == len('test')
from hashlib import sha1
assert result['hash'] == sha1('test'.encode('utf-8')).hexdigest(), result
Expand All @@ -196,7 +195,7 @@ def test_resource_hash_and_content_length(self, client):
def test_archived_file(self, client):
url = client + '/?status=200&content=test&content-type=csv'
res_id = self._test_resource(url)['id']
result = json.loads(update_resource(res_id))
result = self._get_update_resource_json(res_id)

assert result['cache_filepath']
assert os.path.exists(result['cache_filepath'])
Expand All @@ -211,14 +210,14 @@ def test_archived_file(self, client):
def test_update_url_with_unknown_content_type(self, client):
url = client + '/?content-type=application/foo&content=test'
res_id = self._test_resource(url, format='foo')['id'] # format has no effect
result = json.loads(update_resource(res_id))
result = self._get_update_resource_json(res_id)
assert result, result
assert result['mimetype'] == 'application/foo' # stored from the header

def test_wms_1_3(self, client):
url = client + '/WMS_1_3/'
res_id = self._test_resource(url)['id']
result = json.loads(update_resource(res_id))
result = self._get_update_resource_json(res_id)
assert result, result
assert result['request_type'] == 'WMS 1.3'

Expand Down Expand Up @@ -268,23 +267,23 @@ def test_file_too_large_2(self, client):
def test_content_length_not_integer(self, client):
url = client + '/?status=200&content=content&length=abc&content-type=csv'
res_id = self._test_resource(url)['id']
result = json.loads(update_resource(res_id))
result = self._get_update_resource_json(res_id)
assert result, result

def test_content_length_repeated(self, client):
url = client + '/?status=200&content=content&repeat-length&content-type=csv'
# listing the Content-Length header twice causes requests to
# store the value as a comma-separated list
res_id = self._test_resource(url)['id']
result = json.loads(update_resource(res_id))
result = self._get_update_resource_json(res_id)
assert result, result

def test_url_with_30x_follows_and_records_redirect(self, client):
url = client + '/'
redirect_url = url + u'?status=200&content=test&content-type=text/csv'
url += u'?status=301&location=%s' % quote_plus(redirect_url)
res_id = self._test_resource(url)['id']
result = json.loads(update_resource(res_id))
result = self._get_update_resource_json(res_id)
assert result
assert result['url_redirected_to'] == redirect_url

Expand Down Expand Up @@ -329,6 +328,11 @@ def test_ipipe_notified_dataset(self, client):
assert params.get('package_id') == pkg['id']
assert params.get('resource_id') is None

def _get_update_resource_json(self, id):
result = update_resource(resource_id=id)
assert result, "update_resource returned: {}".format(result)
return json.loads(result)


class TestDownload:
'''Tests of the download method (and things it calls).
Expand All @@ -348,7 +352,7 @@ def _test_resource(self, url, format=None):
pkg = {'name': 'testpkg', 'resources': [
{'url': url, 'format': format or 'TXT', 'description': 'Test'}
]}
pkg = get_action('package_create')(context, pkg)
pkg = ckan_factories.Dataset(**pkg)
return pkg['resources'][0]

def test_head_unsupported(self, client):
Expand Down

0 comments on commit 2502c5d

Please sign in to comment.