Skip to content

Commit

Permalink
Merge pull request #482 from GovDataOfficial/improve-test-coverage-an…
Browse files Browse the repository at this point in the history
…d-fixes-timeout-calculation

Improve test coverage and fixes timeout calculation
  • Loading branch information
metaodi authored Nov 29, 2021
2 parents 9d5679f + d2cf517 commit d84d847
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 66 deletions.
2 changes: 1 addition & 1 deletion ckanext/harvest/logic/action/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ def harvest_jobs_run(context, data_dict):
job_obj = HarvestJob.get(job['id'])
if timeout:
last_time = job_obj.get_last_action_time()
now = datetime.datetime.now()
now = datetime.datetime.utcnow()
if now - last_time > datetime.timedelta(minutes=int(timeout)):
msg = 'Job {} timeout ({} minutes)\n'.format(job_obj.id, timeout)
msg += '\tJob created: {}\n'.format(job_obj.created)
Expand Down
22 changes: 14 additions & 8 deletions ckanext/harvest/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,16 @@ def resubmit_jobs():
# fetch queue
harvest_object_pending = redis.keys(get_fetch_routing_key() + ':*')
for key in harvest_object_pending:
redis_key = redis.get(key)
if redis_key is None:
log.info('Fetch Queue: Redis cannot get key {}'.format(key))
redis_value = redis.get(key)
if redis_value is None:
log.info('Fetch Queue: Redis cannot get value for key {}'.format(key))
continue
date_of_key = datetime.datetime.strptime(
redis_key, "%Y-%m-%d %H:%M:%S.%f")
redis_value, "%Y-%m-%d %H:%M:%S.%f")
log.debug('[Fetch queue]: Check key {} with value {}'.format(key, date_of_key))
# 3 minutes for fetch and import max
if (datetime.datetime.now() - date_of_key).seconds > 180:
log.debug('[Fetch queue]: Re-new harvest object with KEY {} in redis'.format(key))
redis.rpush(get_fetch_routing_key(),
json.dumps({'harvest_object_id': key.split(':')[-1]})
)
Expand All @@ -148,14 +150,16 @@ def resubmit_jobs():
# gather queue
harvest_jobs_pending = redis.keys(get_gather_routing_key() + ':*')
for key in harvest_jobs_pending:
redis_key = redis.get(key)
if redis_key is None:
log.info('Gather Queue: Redis cannot get key {}'.format(key))
redis_value = redis.get(key)
if redis_value is None:
log.info('Gather Queue: Redis cannot get value for key {}'.format(key))
continue
date_of_key = datetime.datetime.strptime(
redis_key, "%Y-%m-%d %H:%M:%S.%f")
redis_value, "%Y-%m-%d %H:%M:%S.%f")
log.debug('[Gather queue]: Check key {} with value {}'.format(key, date_of_key))
# 3 hours for a gather
if (datetime.datetime.now() - date_of_key).seconds > 7200:
log.debug('[Gather queue]: Re-new harvest job with KEY {} in redis'.format(key))
redis.rpush(get_gather_routing_key(),
json.dumps({'harvest_job_id': key.split(':')[-1]})
)
Expand Down Expand Up @@ -186,6 +190,8 @@ def resubmit_objects():
log.debug('Re-sent object {} to the fetch queue'.format(object_id))
publisher.send({'harvest_object_id': object_id})

publisher.close()


class Publisher(object):
def __init__(self, connection, channel, exchange, routing_key):
Expand Down
191 changes: 134 additions & 57 deletions ckanext/harvest/tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import ckanext.harvest.queue as queue
from ckan.plugins.core import SingletonPlugin, implements
import json
import ckan.logic as logic
from ckan.plugins import toolkit
from ckan import model
from ckan.lib.base import config
import uuid
Expand Down Expand Up @@ -44,7 +44,7 @@ def import_stage(self, harvest_object):
assert harvest_object.fetch_finished is not None
assert harvest_object.import_started is not None

user = logic.get_action('get_site_user')(
user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']

Expand All @@ -57,7 +57,7 @@ def import_stage(self, harvest_object):
else:
logic_function = 'package_create'

package_dict = logic.get_action(logic_function)(
package_dict = toolkit.get_action(logic_function)(
{'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True},
json.loads(harvest_object.content)
Expand Down Expand Up @@ -91,64 +91,25 @@ class TestHarvestQueue(object):

def test_01_basic_harvester(self):

if config.get('ckan.harvest.mq.type') == 'redis':
# make sure that there are no old elements in the redis db
redis = queue.get_connection()
redis.flushdb()

# make sure queues/exchanges are created first and are empty
consumer = queue.get_gather_consumer()
consumer_fetch = queue.get_fetch_consumer()
consumer.queue_purge(queue=queue.get_gather_queue_name())
consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())

user = logic.get_action('get_site_user')(
user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']

context = {'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True}

source_dict = {
'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test',
}

harvest_source = logic.get_action('harvest_source_create')(
context,
source_dict
)

assert harvest_source['source_type'] == 'test', harvest_source
assert harvest_source['url'] == 'basic_test', harvest_source

harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)

job_id = harvest_job['id']

assert harvest_job['source_id'] == harvest_source['id'], harvest_job

assert harvest_job['status'] == u'Running'

assert logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'

# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')

queue.gather_callback(consumer, *reply)

all_objects = model.Session.query(HarvestObject).all()

assert len(all_objects) == 3
assert all_objects[0].state == 'WAITING'
assert all_objects[1].state == 'WAITING'
assert all_objects[2].state == 'WAITING'

assert len(model.Session.query(HarvestObject).all()) == 3
assert len(model.Session.query(HarvestObjectExtra).all()) == 1
harvest_source, job_id = self._create_harvest_job_and_finish_gather_stage(consumer, context)

# do three times as three harvest objects
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
Expand All @@ -173,20 +134,20 @@ def test_01_basic_harvester(self):
assert all_objects[2].report_status == 'added'

# fire run again to check if job is set to Finished
logic.get_action('harvest_jobs_run')(
toolkit.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)

harvest_job = logic.get_action('harvest_job_show')(
harvest_job = toolkit.get_action('harvest_job_show')(
context,
{'id': job_id}
)

assert harvest_job['status'] == u'Finished'
assert harvest_job['stats'] == {'added': 3, 'updated': 0, 'not modified': 0, 'errored': 0, 'deleted': 0}

harvest_source_dict = logic.get_action('harvest_source_show')(
harvest_source_dict = toolkit.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
Expand All @@ -197,13 +158,13 @@ def test_01_basic_harvester(self):
assert harvest_source_dict['status']['job_count'] == 1

# Second run
harvest_job = logic.get_action('harvest_job_create')(
harvest_job = toolkit.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)

job_id = harvest_job['id']
assert logic.get_action('harvest_job_show')(
assert toolkit.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
Expand Down Expand Up @@ -238,18 +199,18 @@ def test_01_basic_harvester(self):
assert len(all_objects) == 1

# run to make sure job is marked as finshed
logic.get_action('harvest_jobs_run')(
toolkit.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)

harvest_job = logic.get_action('harvest_job_show')(
harvest_job = toolkit.get_action('harvest_job_show')(
context,
{'id': job_id}
)
assert harvest_job['stats'] == {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1}

harvest_source_dict = logic.get_action('harvest_source_show')(
harvest_source_dict = toolkit.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
Expand Down Expand Up @@ -295,6 +256,122 @@ def test_redis_queue_purging(self):
finally:
redis.delete('ckanext-harvest:some-random-key')

def test_resubmit_objects(self):
'''
Test that only harvest objects re-submitted which were not be present in the redis fetch queue.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
pytest.skip()
# make sure that there are no old elements in the redis db
redis = queue.get_connection()
fetch_routing_key = queue.get_fetch_routing_key()
redis.flushdb()
try:
# make sure queues/exchanges are created first and are empty
consumer = queue.get_gather_consumer()
consumer_fetch = queue.get_fetch_consumer()
consumer.queue_purge(queue=queue.get_gather_queue_name())
consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())

user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']

context = {'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True}

harvest_source, job_id = self._create_harvest_job_and_finish_gather_stage(consumer, context)

assert redis.llen(fetch_routing_key) == 3

# do only one time for the first harvest object
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)

count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
assert count == 1

all_objects = model.Session.query(HarvestObject).order_by(HarvestObject.state.asc()).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'COMPLETE'
assert all_objects[0].report_status == 'added'
assert all_objects[0].current is True
assert all_objects[1].state == 'WAITING'
assert all_objects[1].current is False
assert all_objects[2].state == 'WAITING'
assert all_objects[2].current is False

assert len(redis.keys(fetch_routing_key + ':*')) == 0
assert redis.llen(fetch_routing_key) == 2

# Remove one object from redis that should be re-sent to the fetch queue
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
fetch_queue_items = redis.lrange(fetch_routing_key, 0, 10)
assert len(fetch_queue_items) == 1
harvest_object_id = reply[2]
assert fetch_queue_items[0] != harvest_object_id

queue.resubmit_objects()

assert redis.llen(fetch_routing_key) == 2
fetch_queue_items = redis.lrange(fetch_routing_key, 0, 10)
assert harvest_object_id in fetch_queue_items
assert redis.dbsize() == 1
finally:
redis.flushdb()

def _create_harvest_job_and_finish_gather_stage(self, consumer, context):
source_dict = {'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test'}

try:
harvest_source = toolkit.get_action('harvest_source_create')(
context,
source_dict)
except toolkit.ValidationError:
harvest_source = toolkit.get_action('harvest_source_show')(
context,
{'id': source_dict['name']}
)
pass

assert harvest_source['source_type'] == 'test', harvest_source
assert harvest_source['url'] == 'basic_test', harvest_source

harvest_job = toolkit.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True})
job_id = harvest_job['id']

assert harvest_job['source_id'] == harvest_source['id'], harvest_job
assert harvest_job['status'] == u'Running'

assert toolkit.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'

# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')

queue.gather_callback(consumer, *reply)

all_objects = model.Session.query(HarvestObject).all()

assert len(all_objects) == 3
assert all_objects[0].state == 'WAITING'
assert all_objects[1].state == 'WAITING'
assert all_objects[2].state == 'WAITING'

assert len(model.Session.query(HarvestObject).all()) == 3
assert len(model.Session.query(HarvestObjectExtra).all()) == 1

return harvest_source, job_id


class TestHarvestCorruptRedis(object):

Expand Down

0 comments on commit d84d847

Please sign in to comment.