diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 928a0d9d..33fde11e 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -614,7 +614,7 @@ def harvest_jobs_run(context, data_dict): job_obj = HarvestJob.get(job['id']) if timeout: last_time = job_obj.get_last_action_time() - now = datetime.datetime.now() + now = datetime.datetime.utcnow() if now - last_time > datetime.timedelta(minutes=int(timeout)): msg = 'Job {} timeout ({} minutes)\n'.format(job_obj.id, timeout) msg += '\tJob created: {}\n'.format(job_obj.created) diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index 08d158ab..61e202b1 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -132,14 +132,16 @@ def resubmit_jobs(): # fetch queue harvest_object_pending = redis.keys(get_fetch_routing_key() + ':*') for key in harvest_object_pending: - redis_key = redis.get(key) - if redis_key is None: - log.info('Fetch Queue: Redis cannot get key {}'.format(key)) + redis_value = redis.get(key) + if redis_value is None: + log.info('Fetch Queue: Redis cannot get value for key {}'.format(key)) continue date_of_key = datetime.datetime.strptime( - redis_key, "%Y-%m-%d %H:%M:%S.%f") + redis_value, "%Y-%m-%d %H:%M:%S.%f") + log.debug('[Fetch queue]: Check key {} with value {}'.format(key, date_of_key)) # 3 minutes for fetch and import max if (datetime.datetime.now() - date_of_key).seconds > 180: + log.debug('[Fetch queue]: Re-new harvest object with KEY {} in redis'.format(key)) redis.rpush(get_fetch_routing_key(), json.dumps({'harvest_object_id': key.split(':')[-1]}) ) @@ -148,14 +150,16 @@ def resubmit_jobs(): # gather queue harvest_jobs_pending = redis.keys(get_gather_routing_key() + ':*') for key in harvest_jobs_pending: - redis_key = redis.get(key) - if redis_key is None: - log.info('Gather Queue: Redis cannot get key {}'.format(key)) + redis_value = redis.get(key) + if redis_value is None: + log.info('Gather Queue: Redis cannot get value for key {}'.format(key)) continue date_of_key = datetime.datetime.strptime( - redis_key, "%Y-%m-%d %H:%M:%S.%f") + redis_value, "%Y-%m-%d %H:%M:%S.%f") + log.debug('[Gather queue]: Check key {} with value {}'.format(key, date_of_key)) # 3 hours for a gather if (datetime.datetime.now() - date_of_key).seconds > 7200: + log.debug('[Gather queue]: Re-new harvest job with KEY {} in redis'.format(key)) redis.rpush(get_gather_routing_key(), json.dumps({'harvest_job_id': key.split(':')[-1]}) ) @@ -186,6 +190,8 @@ def resubmit_objects(): log.debug('Re-sent object {} to the fetch queue'.format(object_id)) publisher.send({'harvest_object_id': object_id}) + publisher.close() + class Publisher(object): def __init__(self, connection, channel, exchange, routing_key): diff --git a/ckanext/harvest/tests/test_queue.py b/ckanext/harvest/tests/test_queue.py index 1bf79d3f..e89be530 100644 --- a/ckanext/harvest/tests/test_queue.py +++ b/ckanext/harvest/tests/test_queue.py @@ -6,7 +6,7 @@ import ckanext.harvest.queue as queue from ckan.plugins.core import SingletonPlugin, implements import json -import ckan.logic as logic +from ckan.plugins import toolkit from ckan import model from ckan.lib.base import config import uuid @@ -44,7 +44,7 @@ def import_stage(self, harvest_object): assert harvest_object.fetch_finished is not None assert harvest_object.import_started is not None - user = logic.get_action('get_site_user')( + user = toolkit.get_action('get_site_user')( {'model': model, 'ignore_auth': True}, {} )['name'] @@ -57,7 +57,7 @@ def import_stage(self, harvest_object): else: logic_function = 'package_create' - package_dict = logic.get_action(logic_function)( + package_dict = toolkit.get_action(logic_function)( {'model': model, 'session': model.Session, 'user': user, 'api_version': 3, 'ignore_auth': True}, json.loads(harvest_object.content) @@ -91,64 +91,25 @@ class TestHarvestQueue(object): def test_01_basic_harvester(self): + if config.get('ckan.harvest.mq.type') == 'redis': + # make sure that there are no old elements in the redis db + redis = queue.get_connection() + redis.flushdb() + # make sure queues/exchanges are created first and are empty consumer = queue.get_gather_consumer() consumer_fetch = queue.get_fetch_consumer() consumer.queue_purge(queue=queue.get_gather_queue_name()) consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name()) - user = logic.get_action('get_site_user')( + user = toolkit.get_action('get_site_user')( {'model': model, 'ignore_auth': True}, {} )['name'] context = {'model': model, 'session': model.Session, 'user': user, 'api_version': 3, 'ignore_auth': True} - source_dict = { - 'title': 'Test Source', - 'name': 'test-source', - 'url': 'basic_test', - 'source_type': 'test', - } - - harvest_source = logic.get_action('harvest_source_create')( - context, - source_dict - ) - - assert harvest_source['source_type'] == 'test', harvest_source - assert harvest_source['url'] == 'basic_test', harvest_source - - harvest_job = logic.get_action('harvest_job_create')( - context, - {'source_id': harvest_source['id'], 'run': True} - ) - - job_id = harvest_job['id'] - - assert harvest_job['source_id'] == harvest_source['id'], harvest_job - - assert harvest_job['status'] == u'Running' - - assert logic.get_action('harvest_job_show')( - context, - {'id': job_id} - )['status'] == u'Running' - - # pop on item off the queue and run the callback - reply = consumer.basic_get(queue='ckan.harvest.gather') - - queue.gather_callback(consumer, *reply) - - all_objects = model.Session.query(HarvestObject).all() - - assert len(all_objects) == 3 - assert all_objects[0].state == 'WAITING' - assert all_objects[1].state == 'WAITING' - assert all_objects[2].state == 'WAITING' - - assert len(model.Session.query(HarvestObject).all()) == 3 - assert len(model.Session.query(HarvestObjectExtra).all()) == 1 + harvest_source, job_id = self._create_harvest_job_and_finish_gather_stage(consumer, context) # do three times as three harvest objects reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch') @@ -173,12 +134,12 @@ def test_01_basic_harvester(self): assert all_objects[2].report_status == 'added' # fire run again to check if job is set to Finished - logic.get_action('harvest_jobs_run')( + toolkit.get_action('harvest_jobs_run')( context, {'source_id': harvest_source['id']} ) - harvest_job = logic.get_action('harvest_job_show')( + harvest_job = toolkit.get_action('harvest_job_show')( context, {'id': job_id} ) @@ -186,7 +147,7 @@ def test_01_basic_harvester(self): assert harvest_job['status'] == u'Finished' assert harvest_job['stats'] == {'added': 3, 'updated': 0, 'not modified': 0, 'errored': 0, 'deleted': 0} - harvest_source_dict = logic.get_action('harvest_source_show')( + harvest_source_dict = toolkit.get_action('harvest_source_show')( context, {'id': harvest_source['id']} ) @@ -197,13 +158,13 @@ def test_01_basic_harvester(self): assert harvest_source_dict['status']['job_count'] == 1 # Second run - harvest_job = logic.get_action('harvest_job_create')( + harvest_job = toolkit.get_action('harvest_job_create')( context, {'source_id': harvest_source['id'], 'run': True} ) job_id = harvest_job['id'] - assert logic.get_action('harvest_job_show')( + assert toolkit.get_action('harvest_job_show')( context, {'id': job_id} )['status'] == u'Running' @@ -238,18 +199,18 @@ def test_01_basic_harvester(self): assert len(all_objects) == 1 # run to make sure job is marked as finshed - logic.get_action('harvest_jobs_run')( + toolkit.get_action('harvest_jobs_run')( context, {'source_id': harvest_source['id']} ) - harvest_job = logic.get_action('harvest_job_show')( + harvest_job = toolkit.get_action('harvest_job_show')( context, {'id': job_id} ) assert harvest_job['stats'] == {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1} - harvest_source_dict = logic.get_action('harvest_source_show')( + harvest_source_dict = toolkit.get_action('harvest_source_show')( context, {'id': harvest_source['id']} ) @@ -295,6 +256,122 @@ def test_redis_queue_purging(self): finally: redis.delete('ckanext-harvest:some-random-key') + def test_resubmit_objects(self): + ''' + Test that only harvest objects re-submitted which were not be present in the redis fetch queue. + ''' + if config.get('ckan.harvest.mq.type') != 'redis': + pytest.skip() + # make sure that there are no old elements in the redis db + redis = queue.get_connection() + fetch_routing_key = queue.get_fetch_routing_key() + redis.flushdb() + try: + # make sure queues/exchanges are created first and are empty + consumer = queue.get_gather_consumer() + consumer_fetch = queue.get_fetch_consumer() + consumer.queue_purge(queue=queue.get_gather_queue_name()) + consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name()) + + user = toolkit.get_action('get_site_user')( + {'model': model, 'ignore_auth': True}, {} + )['name'] + + context = {'model': model, 'session': model.Session, + 'user': user, 'api_version': 3, 'ignore_auth': True} + + harvest_source, job_id = self._create_harvest_job_and_finish_gather_stage(consumer, context) + + assert redis.llen(fetch_routing_key) == 3 + + # do only one time for the first harvest object + reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch') + queue.fetch_callback(consumer_fetch, *reply) + + count = model.Session.query(model.Package) \ + .filter(model.Package.type == 'dataset') \ + .count() + assert count == 1 + + all_objects = model.Session.query(HarvestObject).order_by(HarvestObject.state.asc()).all() + assert len(all_objects) == 3 + assert all_objects[0].state == 'COMPLETE' + assert all_objects[0].report_status == 'added' + assert all_objects[0].current is True + assert all_objects[1].state == 'WAITING' + assert all_objects[1].current is False + assert all_objects[2].state == 'WAITING' + assert all_objects[2].current is False + + assert len(redis.keys(fetch_routing_key + ':*')) == 0 + assert redis.llen(fetch_routing_key) == 2 + + # Remove one object from redis that should be re-sent to the fetch queue + reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch') + fetch_queue_items = redis.lrange(fetch_routing_key, 0, 10) + assert len(fetch_queue_items) == 1 + harvest_object_id = reply[2] + assert fetch_queue_items[0] != harvest_object_id + + queue.resubmit_objects() + + assert redis.llen(fetch_routing_key) == 2 + fetch_queue_items = redis.lrange(fetch_routing_key, 0, 10) + assert harvest_object_id in fetch_queue_items + assert redis.dbsize() == 1 + finally: + redis.flushdb() + + def _create_harvest_job_and_finish_gather_stage(self, consumer, context): + source_dict = {'title': 'Test Source', + 'name': 'test-source', + 'url': 'basic_test', + 'source_type': 'test'} + + try: + harvest_source = toolkit.get_action('harvest_source_create')( + context, + source_dict) + except toolkit.ValidationError: + harvest_source = toolkit.get_action('harvest_source_show')( + context, + {'id': source_dict['name']} + ) + pass + + assert harvest_source['source_type'] == 'test', harvest_source + assert harvest_source['url'] == 'basic_test', harvest_source + + harvest_job = toolkit.get_action('harvest_job_create')( + context, + {'source_id': harvest_source['id'], 'run': True}) + job_id = harvest_job['id'] + + assert harvest_job['source_id'] == harvest_source['id'], harvest_job + assert harvest_job['status'] == u'Running' + + assert toolkit.get_action('harvest_job_show')( + context, + {'id': job_id} + )['status'] == u'Running' + + # pop on item off the queue and run the callback + reply = consumer.basic_get(queue='ckan.harvest.gather') + + queue.gather_callback(consumer, *reply) + + all_objects = model.Session.query(HarvestObject).all() + + assert len(all_objects) == 3 + assert all_objects[0].state == 'WAITING' + assert all_objects[1].state == 'WAITING' + assert all_objects[2].state == 'WAITING' + + assert len(model.Session.query(HarvestObject).all()) == 3 + assert len(model.Session.query(HarvestObjectExtra).all()) == 1 + + return harvest_source, job_id + class TestHarvestCorruptRedis(object):