From ed5c3bf68ec207c0637108626a95c11210cbb66b Mon Sep 17 00:00:00 2001 From: pdelboca Date: Thu, 2 Mar 2023 12:10:46 -0300 Subject: [PATCH 1/4] Refactor test_jobs.py --- ckanext/xloader/tests/test_jobs.py | 845 ++++------------------------- test.ini | 2 +- 2 files changed, 109 insertions(+), 738 deletions(-) diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index 648a2451..5d3844e6 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -1,740 +1,120 @@ -from __future__ import absolute_import -import os -import json -import random -import datetime -import time -import six - -try: - from collections import OrderedDict # from python 2.7 -except ImportError: - from sqlalchemy.util import OrderedDict import pytest +import io -from nose.tools import make_decorator -try: - from unittest import mock -except ImportError: - import mock -import responses -from sqlalchemy import MetaData, Table -from sqlalchemy.sql import select - -import ckan.plugins as p +from datetime import datetime -try: - config = p.toolkit.config -except AttributeError: - from pylons import config - -from ckanext.xloader import jobs -from ckanext.xloader import db as jobs_db -from ckanext.xloader.loader import get_write_engine +from requests import Response +from ckan.cli.cli import ckan +from ckan.plugins import toolkit from ckan.tests import helpers, factories -SOURCE_URL = "http://www.example.com/static/file" - - -def mock_actions(func): - """ - Decorator that mocks actions used by these tests - Based on ckan.test.helpers.mock_action - """ - - def wrapper(*args, **kwargs): - # Mock CKAN's resource_show API - from ckan.logic import get_action as original_get_action - - def side_effect(called_action_name): - if called_action_name == "resource_show": - - def mock_resource_show(context, data_dict): - return { - "id": data_dict["id"], - "name": "short name", - "url": SOURCE_URL, - "format": "", - "package_id": "test-pkg", - } - - return mock_resource_show - elif called_action_name == "package_show": - - def mock_package_show(context, data_dict): - return { - "id": data_dict["id"], - "name": "pkg-name", - } - - return mock_package_show - else: - return original_get_action(called_action_name) - - try: - with mock.patch( - "ckanext.xloader.jobs.get_action" - ) as mock_get_action: - mock_get_action.side_effect = side_effect - - return_value = func(*args, **kwargs) - finally: - pass - # Make sure to stop the mock, even with an exception - # mock_action.stop() - return return_value - - return make_decorator(func)(wrapper) - - -@pytest.mark.skip -@pytest.mark.usefixtures("with_plugins") -@pytest.mark.ckan_config("ckan.plugins", "datastore xloader") -class TestxloaderDataIntoDatastore(object): - - @pytest.fixture(autouse=True) - def setup_class(self): - self.host = "www.ckan.org" - self.api_key = "my-fake-key" - self.resource_id = "foo-bar-42" - factories.Resource(id=self.resource_id) - jobs_db.init(config, echo=False) - # drop test table - engine, conn = self.get_datastore_engine_and_connection() - conn.execute('DROP TABLE IF EXISTS "{}"'.format(self.resource_id)) - yield - if "_datastore" in dir(self): - connection = self._datastore[1] - connection.close() - - def register_urls( - self, filename="simple.csv", content_type="application/csv" - ): - """Mock some test URLs with responses. - - Mocks some URLs related to a data file and a CKAN resource that - contains the data file, including the URL of the data file itself and - the resource_show, resource_update and datastore_delete URLs. - - :returns: a 2-tuple containing the URL of the data file itself and the - resource_show URL for the resource that contains the data file - - """ - responses.add_passthru(config["solr_url"]) - - # A URL that just returns a static file - responses.add( - responses.GET, - SOURCE_URL, - body=get_sample_file(filename), - content_type=content_type, - ) - - # A URL that mocks the response that CKAN's resource_update API would - # give after successfully updating a resource. - resource_update_url = ( - "http://www.ckan.org/api/3/action/resource_update" - ) - responses.add( - responses.POST, - resource_update_url, - body=json.dumps({"success": True}), - content_type="application/json", - ) - - # A URL that mock's the response that CKAN's datastore plugin's - # datastore_delete API would give after successfully deleting a - # resource from the datastore. - datastore_del_url = "http://www.ckan.org/api/3/action/datastore_delete" - responses.add( - responses.POST, - datastore_del_url, - body=json.dumps({"success": True}), - content_type="application/json", - ) - - self.callback_url = "http://www.ckan.org/api/3/action/xloader_hook" - responses.add( - responses.POST, - self.callback_url, - body=json.dumps({"success": True}), - content_type="application/json", - ) - - @classmethod - def get_datastore_engine_and_connection(cls): - if "_datastore" not in dir(cls): - engine = get_write_engine() - conn = engine.connect() - cls._datastore = (engine, conn) - return cls._datastore - - def get_datastore_table(self): - engine, conn = self.get_datastore_engine_and_connection() - meta = MetaData(bind=engine) - table = Table( - self.resource_id, meta, autoload=True, autoload_with=engine - ) - s = select([table]) - with conn.begin(): - result = conn.execute(s) - return dict( - num_rows=result.rowcount, - headers=list(result.keys()), - header_dict=OrderedDict( - [(c.key, six.text_type(c.type)) for c in table.columns] - ), - rows=result.fetchall(), - ) - - def get_load_logs(self, task_id): - conn = jobs_db.ENGINE.connect() - logs = jobs_db.LOGS_TABLE - result = conn.execute( - select([logs.c.level, logs.c.message]).where( - logs.c.job_id == task_id - ) - ) - return Logs(result.fetchall()) - - def get_time_of_last_analyze(self): - # When ANALYZE runs it appears to take a moment for the - # pg_stat_user_tables to update, which we use to check analyze runs, - # so sadly we need a sleep :( - # DR: 0.25 is pretty reliable on my machine, but give a wide margin - time.sleep(1) - engine, conn = self.get_datastore_engine_and_connection() - result = conn.execute( - """ - SELECT last_analyze, last_autoanalyze - FROM pg_stat_user_tables - WHERE relname='{}'; - """.format( - self.resource_id - ) - ) - last_analyze_datetimes = result.fetchall()[0] - return max([x for x in last_analyze_datetimes if x] or [None]) - - @mock_actions - @responses.activate - def test_simple_csv(self): - # Test not only the load and xloader_hook is called at the end - self.register_urls(filename="simple.csv") - data = { - "api_key": self.api_key, - "job_type": "xloader_to_datastore", - "result_url": self.callback_url, - "metadata": { - "ckan_url": "http://%s/" % self.host, - "resource_id": self.resource_id, - }, - } - job_id = "test{}".format(random.randint(0, 1e5)) - - with mock.patch( - "ckanext.xloader.jobs.set_resource_metadata" - ) as mocked_set_resource_metadata: - # in tests we call jobs directly, rather than use rq, so mock - # get_current_job() - with mock.patch( - "ckanext.xloader.jobs.get_current_job", - return_value=mock.Mock(id=job_id), - ): - result = jobs.xloader_data_into_datastore(data) - assert result is None, jobs_db.get_job(job_id)["error"]["message"] - - # Check it said it was successful - assert ( - responses.calls[-1].request.url - == "http://www.ckan.org/api/3/action/xloader_hook" - ) - job_dict = json.loads(responses.calls[-1].request.body) - assert job_dict["status"] == u"complete", job_dict - assert job_dict == { - u"metadata": { - u"datastore_contains_all_records_of_source_file": True, - u"datastore_active": True, - u"ckan_url": u"http://www.ckan.org/", - u"resource_id": u"foo-bar-42", - }, - u"status": u"complete", - } - - # Check the load - data = self.get_datastore_table() - assert data["headers"] == [ - "_id", - "_full_text", - "date", - "temperature", - "place", - ] - assert data["header_dict"]["date"] == "TEXT" - # 'TIMESTAMP WITHOUT TIME ZONE') - assert data["header_dict"]["temperature"] == "TEXT" # 'NUMERIC') - assert data["header_dict"]["place"] == "TEXT" # 'TEXT') - assert data["num_rows"] == 6 - assert data["rows"][0][2:] == (u"2011-01-01", u"1", u"Galway") - # (datetime.datetime(2011, 1, 1), 1, 'Galway')) - - # Check it wanted to set the datastore_active=True - mocked_set_resource_metadata.assert_called_once() - assert mocked_set_resource_metadata.call_args[1]["update_dict"] == { - "datastore_contains_all_records_of_source_file": True, - "datastore_active": True, - "ckan_url": "http://www.ckan.org/", - "resource_id": "foo-bar-42", - } - - logs = self.get_load_logs(job_id) - logs.assert_no_errors() - - job = jobs_db.get_job(job_id) - assert job["status"] == u"complete" - assert job["error"] is None - - # Check ANALYZE was run - last_analyze = self.get_time_of_last_analyze() - assert last_analyze - - @mock_actions - @responses.activate - @mock.patch("ckanext.xloader.jobs.MAX_CONTENT_LENGTH", 10000) - @mock.patch("ckanext.xloader.jobs.MAX_EXCERPT_LINES", 100) - def test_too_large_csv(self): - - # Test not only the load and xloader_hook is called at the end - self.register_urls(filename="simple-large.csv") - data = { - "api_key": self.api_key, - "job_type": "xloader_to_datastore", - "result_url": self.callback_url, - "metadata": { - "ckan_url": "http://%s/" % self.host, - "resource_id": self.resource_id, - }, - } - job_id = "test{}".format(random.randint(0, 1e5)) - - with mock.patch( - "ckanext.xloader.jobs.set_resource_metadata" - ) as mocked_set_resource_metadata: - # in tests we call jobs directly, rather than use rq, so mock - # get_current_job() - with mock.patch( - "ckanext.xloader.jobs.get_current_job", - return_value=mock.Mock(id=job_id), - ): - result = jobs.xloader_data_into_datastore(data) - assert result is None, jobs_db.get_job(job_id)["error"]["message"] - - # Check it said it was successful - assert ( - responses.calls[-1].request.url - == "http://www.ckan.org/api/3/action/xloader_hook" - ) - job_dict = json.loads(responses.calls[-1].request.body) - assert job_dict["status"] == u"complete", job_dict - assert job_dict == { - u"metadata": { - u"datastore_contains_all_records_of_source_file": False, - u"datastore_active": True, - u"ckan_url": u"http://www.ckan.org/", - u"resource_id": u"foo-bar-42", - }, - u"status": u"complete", - } - - # Check the load - data = self.get_datastore_table() - assert data["headers"] == ["_id", "_full_text", "id", "text"] - assert data["header_dict"]["id"] == "TEXT" - # 'TIMESTAMP WITHOUT TIME ZONE') - assert data["header_dict"]["text"] == "TEXT" - assert data["num_rows"] <= 100 - assert data["num_rows"] > 0 - assert data["rows"][0][2:] == (u"1", u"a") - - # Check it wanted to set the datastore_active=True - mocked_set_resource_metadata.assert_called_once() - assert mocked_set_resource_metadata.call_args[1]["update_dict"] == { - "datastore_contains_all_records_of_source_file": False, - "datastore_active": True, - "ckan_url": "http://www.ckan.org/", - "resource_id": "foo-bar-42", - } - - logs = self.get_load_logs(job_id) - logs.assert_no_errors() +from unittest import mock - job = jobs_db.get_job(job_id) - assert job["status"] == u"complete" - assert job["error"] is None - - # Check ANALYZE was run - last_analyze = self.get_time_of_last_analyze() - assert last_analyze - - @mock_actions - @responses.activate - @mock.patch("ckanext.xloader.jobs.MAX_CONTENT_LENGTH", 10000) - @mock.patch("ckanext.xloader.jobs.MAX_EXCERPT_LINES", 100) - def test_too_large_xls(self): - - # Test not only the load and xloader_hook is called at the end - self.register_urls(filename="simple-large.xls") - data = { - "api_key": self.api_key, - "job_type": "xloader_to_datastore", - "result_url": self.callback_url, - "metadata": { - "ckan_url": "http://%s/" % self.host, - "resource_id": self.resource_id, - }, - } - job_id = "test{}".format(random.randint(0, 1e5)) - - with mock.patch("ckanext.xloader.jobs.set_resource_metadata"): - # in tests we call jobs directly, rather than use rq, so mock - # get_current_job() - with mock.patch( - "ckanext.xloader.jobs.get_current_job", - return_value=mock.Mock(id=job_id), - ): - result = jobs.xloader_data_into_datastore(data) - assert result is not None, jobs_db.get_job(job_id)["error"]["message"] - - # Check it said it was successful - assert ( - responses.calls[-1].request.url - == "http://www.ckan.org/api/3/action/xloader_hook" - ) - job_dict = json.loads(responses.calls[-1].request.body) - assert job_dict["status"] == u"error", job_dict - assert job_dict == { - u"status": u"error", - u"metadata": { - u"ckan_url": u"http://www.ckan.org/", - u"datastore_contains_all_records_of_source_file": False, - u"resource_id": u"foo-bar-42", - }, - u"error": u"Loading file raised an error: array index out of range", - } - - job = jobs_db.get_job(job_id) - assert job["status"] == u"error" - assert job["error"] == { - u"message": u"Loading file raised an error: array index out of range" - } - - @mock_actions - @responses.activate - def test_tabulator(self): - # xloader's COPY can't handle xls, so it will be dealt with by - # tabulator - self.register_urls( - filename="simple.xls", content_type="application/vnd.ms-excel" - ) - data = { - "api_key": self.api_key, - "job_type": "xloader_to_datastore", - "result_url": self.callback_url, - "metadata": { - "ckan_url": "http://%s/" % self.host, - "resource_id": self.resource_id, - }, - } - job_id = "test{}".format(random.randint(0, 1e5)) - - with mock.patch( - "ckanext.xloader.jobs.set_resource_metadata" - ) as mocked_set_resource_metadata: - # in tests we call jobs directly, rather than use rq, so mock - # get_current_job() - with mock.patch( - "ckanext.xloader.jobs.get_current_job", - return_value=mock.Mock(id=job_id), - ): - result = jobs.xloader_data_into_datastore(data) - assert result is None - - # Check it said it was successful - assert ( - responses.calls[-1].request.url - == "http://www.ckan.org/api/3/action/xloader_hook" - ) - job_dict = json.loads(responses.calls[-1].request.body) - assert job_dict["status"] == u"complete", job_dict - assert job_dict == { - u"metadata": { - u"datastore_contains_all_records_of_source_file": True, - u"datastore_active": True, - u"ckan_url": u"http://www.ckan.org/", - u"resource_id": u"foo-bar-42", - }, - u"status": u"complete", - } - - # Check the load - data = self.get_datastore_table() - assert data["headers"] == [ - "_id", - "_full_text", - "date", - "temperature", - "place", - ] - assert data["header_dict"]["date"] == "TIMESTAMP WITHOUT TIME ZONE" - assert data["header_dict"]["temperature"] == "NUMERIC" - assert data["header_dict"]["place"] == "TEXT" - assert data["num_rows"] == 6 - assert data["rows"][0][2:] == ( - datetime.datetime(2011, 1, 1), - 1, - u"Galway", - ) - - # Check it wanted to set the datastore_active=True - mocked_set_resource_metadata.assert_called_once() - assert mocked_set_resource_metadata.call_args[1]["update_dict"] == { - "ckan_url": "http://www.ckan.org/", - "datastore_contains_all_records_of_source_file": True, - "datastore_active": True, - "resource_id": "foo-bar-42", - } - - # check logs have the error doing the COPY - logs = self.get_load_logs(job_id) - copy_error_index = None - for i, log in enumerate(logs): - if log[0] == "WARNING" and log[1].startswith( - "Load using COPY failed: Error during the load into PostgreSQL" - ): - copy_error_index = i - break - assert copy_error_index, "Missing COPY error" - - # check messytable portion of the logs - logs = Logs(logs[copy_error_index + 1:]) - assert logs[0] == (u"INFO", u"Trying again with tabulator") - logs.assert_no_errors() - - # Check ANALYZE was run - last_analyze = self.get_time_of_last_analyze() - assert last_analyze - - @mock_actions - @responses.activate - def test_umlaut_and_extra_comma(self): - self.register_urls(filename="umlaut_and_extra_comma.csv") - # This csv has an extra comma which causes the COPY to throw a - # psycopg2.DataError and the umlaut can cause problems for logging the - # error. We need to check that it correctly reverts to using - # tabulator to load it - data = { - "api_key": self.api_key, - "job_type": "xloader_to_datastore", - "result_url": self.callback_url, - "metadata": { - "ckan_url": "http://%s/" % self.host, - "resource_id": self.resource_id, - }, - } - job_id = "test{}".format(random.randint(0, 1e5)) - - with mock.patch("ckanext.xloader.jobs.set_resource_metadata"): - # in tests we call jobs directly, rather than use rq, so mock - # get_current_job() - with mock.patch( - "ckanext.xloader.jobs.get_current_job", - return_value=mock.Mock(id=job_id), - ): - result = jobs.xloader_data_into_datastore(data) - assert result is None, jobs_db.get_job(job_id)["error"]["message"] - - # Check it said it was successful - assert ( - responses.calls[-1].request.url - == "http://www.ckan.org/api/3/action/xloader_hook" - ) - job_dict = json.loads(responses.calls[-1].request.body) - assert job_dict["status"] == u"complete", job_dict - assert job_dict == { - u"metadata": { - u"datastore_contains_all_records_of_source_file": True, - u"datastore_active": True, - u"ckan_url": u"http://www.ckan.org/", - u"resource_id": u"foo-bar-42", - }, - u"status": u"complete", - } - - logs = self.get_load_logs(job_id) - logs.assert_no_errors() - - job = jobs_db.get_job(job_id) - assert job["status"] == u"complete" - assert job["error"] is None - - @mock_actions - @responses.activate - def test_invalid_byte_sequence(self): - self.register_urls(filename='go-realtime.xlsx') - # This xlsx throws an Postgres error on INSERT because of - # 'invalid byte sequence for encoding "UTF8": 0x00' which causes - # the COPY to throw a psycopg2.DataError and umlauts in the file can - # cause problems for logging the error. We need to check that - # it correctly reverts to using tabulator to load it - data = { - 'api_key': self.api_key, - 'job_type': 'xloader_to_datastore', - 'result_url': self.callback_url, - 'metadata': { - 'ckan_url': 'http://%s/' % self.host, - 'resource_id': self.resource_id - } - } - job_id = "test{}".format(random.randint(0, 1e5)) - - with mock.patch('ckanext.xloader.jobs.set_datastore_active_flag'): - # in tests we call jobs directly, rather than use rq, so mock - # get_current_job() - with mock.patch( - "ckanext.xloader.jobs.get_current_job", - return_value=mock.Mock(id=job_id), - ): - result = jobs.xloader_data_into_datastore(data) - assert result is None, jobs_db.get_job(job_id)["error"]["message"] - - # Check it said it was successful - assert responses.calls[-1].request.url == \ - 'http://www.ckan.org/api/3/action/xloader_hook' - job_dict = json.loads(responses.calls[-1].request.body) - assert job_dict['status'] == u'complete', job_dict - assert job_dict == \ - {u'metadata': {u'ckan_url': u'http://www.ckan.org/', - u'resource_id': u'foo-bar-42'}, - u'status': u'complete'} - - logs = self.get_load_logs(job_id) - logs.assert_no_errors() - - job = jobs_db.get_job(job_id) - assert job['status'] == u'complete' - assert job['error'] is None - - @mock_actions - @responses.activate - def test_first_request_is_202_pending_response(self): - # when you first get the CSV it returns this 202 response, which is - # what this server does: https://data-cdfw.opendata.arcgis.com/datasets - responses.add( - responses.GET, - SOURCE_URL, - status=202, - body='{"processingTime":"8.716 seconds","status":"Processing","generating":{}}', - content_type="application/json", - ) - # subsequent GETs of the CSV work fine - self.register_urls() - data = { - "api_key": self.api_key, - "job_type": "xloader_to_datastore", - "result_url": self.callback_url, - "metadata": { - "ckan_url": "http://%s/" % self.host, - "resource_id": self.resource_id, - }, - } - job_id = "test{}".format(random.randint(0, 1e5)) - - with mock.patch( - "ckanext.xloader.jobs.set_resource_metadata" - ) as mocked_set_resource_metadata: - # in tests we call jobs directly, rather than use rq, so mock - # get_current_job() - with mock.patch( - "ckanext.xloader.jobs.get_current_job", - return_value=mock.Mock(id=job_id), - ): - result = jobs.xloader_data_into_datastore(data) - assert result is None, jobs_db.get_job(job_id)["error"]["message"] - - # Check it said it was successful - assert ( - responses.calls[-1].request.url - == "http://www.ckan.org/api/3/action/xloader_hook" - ) - job_dict = json.loads(responses.calls[-1].request.body) - assert job_dict["status"] == u"complete", job_dict - assert job_dict == { - u"metadata": { - u"ckan_url": u"http://www.ckan.org/", - u"datastore_contains_all_records_of_source_file": True, - u"datastore_active": True, - u"resource_id": u"foo-bar-42", - }, - u"status": u"complete", - } - - # Check the load - data = self.get_datastore_table() - assert data["headers"] == [ - "_id", - "_full_text", - "date", - "temperature", - "place", - ] - assert data["header_dict"]["date"] == "TEXT" - # 'TIMESTAMP WITHOUT TIME ZONE') - assert data["header_dict"]["temperature"] == "TEXT" # 'NUMERIC') - assert data["header_dict"]["place"] == "TEXT" # 'TEXT') - assert data["num_rows"] == 6 - assert data["rows"][0][2:] == (u"2011-01-01", u"1", u"Galway") - # (datetime.datetime(2011, 1, 1), 1, 'Galway')) - - # Check it wanted to set the datastore_active=True - mocked_set_resource_metadata.assert_called_once() - assert mocked_set_resource_metadata.call_args[1]["update_dict"] == { - "datastore_contains_all_records_of_source_file": True, - "datastore_active": True, - "ckan_url": "http://www.ckan.org/", - "resource_id": "foo-bar-42", - } - - logs = self.get_load_logs(job_id) - logs.assert_no_errors() - - job = jobs_db.get_job(job_id) - assert job["status"] == u"complete" - assert job["error"] is None - - -class Logs(list): - def get_errors(self): - return [message for level, message in self if level == "ERROR"] - - def grep(self, text): - return [message for level, message in self if text in message] - - def assert_no_errors(self): - errors = self.get_errors() - assert not errors, errors - - -def get_sample_file(filename): - filepath = os.path.join(os.path.dirname(__file__), "samples", filename) - return open(filepath).read() +from ckanext.xloader import jobs +_TEST_FILE_CONTENT = "x, y\n1,2\n2,4\n3,6\n4,8\n5,10" + + +def get_response(download_url, headers): + """Mock jobs.get_response() method.""" + resp = Response() + resp.raw = io.BytesIO(_TEST_FILE_CONTENT.encode()) + resp.headers = headers + return resp + + +def get_large_response(download_url, headers): + """Mock jobs.get_response() method to fake a large file.""" + resp = Response() + resp.raw = io.BytesIO(_TEST_FILE_CONTENT.encode()) + resp.headers = {'content-length': 2000000000} + return resp + + +@pytest.fixture +def data(create_with_upload): + dataset = factories.Dataset() + sysadmin = factories.SysadminWithToken() + resource = create_with_upload( + _TEST_FILE_CONTENT, + "multiplication_2.csv", + url="http://data", + package_id=dataset["id"] + ) + callback_url = toolkit.url_for( + "api.action", ver=3, logic_function="xloader_hook", qualified=True + ) + return { + 'api_key': sysadmin["token"], + 'job_type': 'xloader_to_datastore', + 'result_url': callback_url, + 'metadata': { + 'ignore_hash': True, + 'ckan_url': toolkit.config.get('ckan.site_url'), + 'resource_id': resource["id"], + 'set_url_type': False, + 'task_created': datetime.utcnow().isoformat(), + 'original_url': resource["url"], + } + } + + +@pytest.mark.usefixtures("clean_db", "with_plugins") +@pytest.mark.ckan_config("ckan.plugins", "datastore xloader resource_proxy") +class TestXLoaderJobs(helpers.FunctionalRQTestBase): + + def test_xloader_data_into_datastore(self, cli, data): + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "File hash: d44fa65eda3675e11710682fdb5f1648" in stdout + assert "Fields: [{'id': 'x', 'type': 'text'}, {'id': 'y', 'type': 'text'}]" in stdout + assert "Copying to database..." in stdout + assert "Creating search index..." in stdout + assert "Express Load completed" in stdout + + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] + + def test_xloader_ignore_hash(self, cli, data): + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Express Load completed" in stdout + + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Copying to database..." in stdout + assert "Express Load completed" in stdout + + data["metadata"]["ignore_hash"] = False + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Ignoring resource - the file hash hasn't changed" in stdout + + def test_data_too_big_error_if_content_length_bigger_than_config(self, cli, data): + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_large_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Data too large to load into Datastore:" in stdout + + def test_data_max_excerpt_lines_config(self, cli, data): + self.enqueue(jobs.xloader_data_into_datastore, [data]) + with mock.patch("ckanext.xloader.jobs.get_response", get_large_response): + with mock.patch("ckanext.xloader.jobs.MAX_EXCERPT_LINES", 1): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Loading excerpt of ~1 lines to DataStore." in stdout + + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] is False + + +@pytest.mark.usefixtures("clean_db") class TestSetResourceMetadata(object): - @classmethod - def setup_class(cls): - helpers.reset_db() - def test_simple(self): resource = factories.Resource() @@ -748,15 +128,6 @@ def test_simple(self): ) resource = helpers.call_action("resource_show", id=resource["id"]) - from pprint import pprint - - pprint(resource) - assert resource["datastore_contains_all_records_of_source_file"] in ( - True, - u"True", - ) - # I'm not quite sure why this is a string on travis - I get the bool - # locally - + assert resource["datastore_contains_all_records_of_source_file"] assert resource["datastore_active"] assert resource["ckan_url"] == "http://www.ckan.org/" diff --git a/test.ini b/test.ini index 1415d37f..7bfab684 100644 --- a/test.ini +++ b/test.ini @@ -15,7 +15,7 @@ use = config:../ckan/test-core.ini # Insert any custom config settings to be used when running your extension's # tests here. -ckan.plugins = xloader +ckan.plugins = xloader datastore ckanext.xloader.jobs_db.uri = sqlite:////tmp/jobs.db # Logging configuration From 060fd5bc70961bb15924f0ce9e7156f5809b4a15 Mon Sep 17 00:00:00 2001 From: pdelboca Date: Thu, 2 Mar 2023 12:12:27 -0300 Subject: [PATCH 2/4] Clean no neede config override --- ckanext/xloader/tests/test_jobs.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index 5d3844e6..76854322 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -62,7 +62,6 @@ def data(create_with_upload): @pytest.mark.usefixtures("clean_db", "with_plugins") -@pytest.mark.ckan_config("ckan.plugins", "datastore xloader resource_proxy") class TestXLoaderJobs(helpers.FunctionalRQTestBase): def test_xloader_data_into_datastore(self, cli, data): From f4deb54b4a2c0d00a53156955f79c9e727f95116 Mon Sep 17 00:00:00 2001 From: pdelboca Date: Thu, 2 Mar 2023 12:34:39 -0300 Subject: [PATCH 3/4] Add fixture for CKAN 2.9 --- ckanext/xloader/tests/test_jobs.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index 76854322..bfb40c17 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -12,6 +12,7 @@ from unittest import mock from ckanext.xloader import jobs +from ckanext.xloader.utils import get_xloader_user_apitoken _TEST_FILE_CONTENT = "x, y\n1,2\n2,4\n3,6\n4,8\n5,10" @@ -34,9 +35,20 @@ def get_large_response(download_url, headers): @pytest.fixture -def data(create_with_upload): +def apikey(): + try: + sysadmin = factories.SysadminWithToken() + except AttributeError: + # To provide support with CKAN 2.9 + sysadmin = factories.Sysadmin() + sysadmin["token"] = get_xloader_user_apitoken() + + return sysadmin["token"] + + +@pytest.fixture +def data(create_with_upload, apikey): dataset = factories.Dataset() - sysadmin = factories.SysadminWithToken() resource = create_with_upload( _TEST_FILE_CONTENT, "multiplication_2.csv", @@ -47,7 +59,7 @@ def data(create_with_upload): "api.action", ver=3, logic_function="xloader_hook", qualified=True ) return { - 'api_key': sysadmin["token"], + 'api_key': apikey, 'job_type': 'xloader_to_datastore', 'result_url': callback_url, 'metadata': { From b55b0b5595cbd1bd51e981808a7288d92a479088 Mon Sep 17 00:00:00 2001 From: pdelboca Date: Tue, 7 Mar 2023 08:29:14 -0300 Subject: [PATCH 4/4] Refactor to toolkit.check_ckan_version --- ckanext/xloader/tests/test_jobs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index bfb40c17..e819dad9 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -36,9 +36,9 @@ def get_large_response(download_url, headers): @pytest.fixture def apikey(): - try: + if toolkit.check_ckan_version(min_version="2.10"): sysadmin = factories.SysadminWithToken() - except AttributeError: + else: # To provide support with CKAN 2.9 sysadmin = factories.Sysadmin() sysadmin["token"] = get_xloader_user_apitoken()