From fac53946dea9901e2d22a1a9b335d036f02d73c0 Mon Sep 17 00:00:00 2001 From: Don Naegely Date: Sat, 13 Jun 2015 11:42:04 -0700 Subject: [PATCH 1/2] Add result retrieval and jobs API Signed-off-by: Don Naegely --- .travis.yml | 3 +- avocado/async/__init__.py | 0 avocado/async/utils.py | 92 ++++++++++++++ avocado/conf/global_settings.py | 3 + avocado/query/utils.py | 190 ++++++++++++++++++++++++++++- requirements.txt | 1 + setup.py | 1 + tests/cases/query/tests/utils.py | 199 ++++++++++++++++++++++++++++++- tests/processors.py | 13 ++ tests/settings.py | 20 +++- 10 files changed, 515 insertions(+), 7 deletions(-) create mode 100644 avocado/async/__init__.py create mode 100644 avocado/async/utils.py create mode 100644 tests/processors.py diff --git a/.travis.yml b/.travis.yml index 6ca51b3..815057d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,6 +13,7 @@ env: services: - memcached + - redis-server addons: - postgres @@ -40,4 +41,4 @@ after_success: matrix: exclude: - python: "2.6" - env: DJANGO=1.7.6 POSTGRES_TEST_USER=postgres POSTGRES_TEST_NAME=avocado MYSQL_TEST_USER=root MYSQL_TEST_NAME=avocado \ No newline at end of file + env: DJANGO=1.7.6 POSTGRES_TEST_USER=postgres POSTGRES_TEST_NAME=avocado MYSQL_TEST_USER=root MYSQL_TEST_NAME=avocado diff --git a/avocado/async/__init__.py b/avocado/async/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/avocado/async/utils.py b/avocado/async/utils.py new file mode 100644 index 0000000..3ac8081 --- /dev/null +++ b/avocado/async/utils.py @@ -0,0 +1,92 @@ +from django_rq import get_worker, get_queue + +from avocado.conf import settings +from avocado.query import utils + + +def run_jobs(): + """ + Execute all the pending jobs. + """ + get_worker(settings.ASYNC_QUEUE).work(burst=True) + + +def get_job(job_id): + """ + Return the job for the specified ID or None if it cannot be found. + + Args: + job_id(uuid): The ID of the job to retrieve. + + Returns: + The job with the matching ID or None if no job with the supplied job + ID could be found. + """ + queue = get_queue(settings.ASYNC_QUEUE) + return queue.fetch_job(job_id) + + +def get_job_count(): + """ + Returns the current number of jobs in the queue. + """ + return get_queue(settings.ASYNC_QUEUE).count + + +def get_job_result(job_id): + """ + Returns the result of the job with the supplied ID. + + If the job could not be found or the job is not finished yet, None will + be returned as the job result. + + Args: + job_id(uuid): The ID of the job to retrieve the result for. + + Returns: + The result of the job with the matching ID or None if the job could + not be found or is not finished. + """ + return get_job(job_id).result + + +def get_jobs(): + """ + Returns a collection of all the pending jobs. + """ + return get_queue(settings.ASYNC_QUEUE).jobs + + +def cancel_job(job_id): + """ + Cancel the job and its associated query if they exist. + + Args: + job_id(uuid): The ID of the job to cancel + + Returns: + The cancellation result of the job's query if it had one. If the job + could not be found or the job had no query, this method returns None. + """ + job = get_job(job_id) + + if job is None: + return None + + result = None + query_name = job.meta.get('query_name') + if query_name: + canceled = utils.cancel_query(query_name) + result = { + 'canceled': canceled + } + + job.cancel() + return result + + +def cancel_all_jobs(): + """ + Cancels all jobs. + """ + get_queue(settings.ASYNC_QUEUE).empty() diff --git a/avocado/conf/global_settings.py b/avocado/conf/global_settings.py index 37996e9..2a9a4ff 100644 --- a/avocado/conf/global_settings.py +++ b/avocado/conf/global_settings.py @@ -120,3 +120,6 @@ # the ad-hoc queries built from a context and view. DATA_CACHE = 'default' QUERY_CACHE = 'default' + +# Name of the queue to use for scheduling and working on async jobs. +ASYNC_QUEUE = 'avocado' diff --git a/avocado/query/utils.py b/avocado/query/utils.py index 4c0e6ff..724baa7 100644 --- a/avocado/query/utils.py +++ b/avocado/query/utils.py @@ -1,12 +1,18 @@ import logging + import django -from django.db import connections, DEFAULT_DB_ALIAS, DatabaseError from django.core.cache import get_cache +from django.db import connections, DEFAULT_DB_ALIAS, DatabaseError +from django_rq import get_queue + from avocado.conf import settings +from avocado.export import HTMLExporter, registry as exporters +from avocado.query import pipeline -logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__) +DEFAULT_LIMIT = 20 TEMP_DB_ALIAS_PREFIX = '_db:{0}' @@ -184,3 +190,183 @@ def _cancel_query(name, db, pid): return True logger.warn('canceling queries for {0} is not supported'.format(engine)) + + +def get_exporter_class(export_type): + """ + Returns the exporter class for the supplied export type name. + + Args: + export_type(string): The string name of the exporter. + + Returns: + The exporter class for the supplied exporter_type as defined in + the exporters registry. See avocado.export.registry for more info. + """ + if export_type.lower() == 'html': + return HTMLExporter + + return exporters[export_type] + + +def async_get_result_rows(context, view, query_options, job_options=None): + """ + Creates a new job to asynchronously get result rows and returns the job ID. + + Args: + See get_result_rows argument list. + + Keyword Arugments: + Set as properties on the returned job's meta. + + Returns: + The ID of the created job. + """ + if not job_options: + job_options = {} + + queue = get_queue(settings.ASYNC_QUEUE) + job = queue.enqueue(get_result_rows, + context, + view, + query_options, + evaluate_rows=True) + job.meta.update(job_options) + job.save() + + return job.id + + +def get_result_rows(context, view, query_options, evaluate_rows=False): + """ + Returns the result rows and options given the supplied arguments. + + The options include the exporter, queryset, offset, limit, page, and + stop_page that were used when calculating the result rows. These can give + some more context to callers of this method as far as the returned row + set is concerned. + + Args: + context (DataContext): Context for the query processor + view (DataView): View for the query processor + query_options (dict): Options for the query and result rows slice. + These options include: + * page: Start page of the result row slice. + * limit: Upper bound on number of result rows returned. + * stop_page: Stop page of result row slice. + * query_name: Query name used when isolating result row query. + * processor: Processor to use to generate queryset. + * tree: Modeltree to pass to QueryProcessor. + * export_type: Export type to use for result rows. + * reader: Reader type to use when exporting, see + export._base.BaseExporter.readers for available readers. + + Kwargs: + evaluate_rows (default=False): When this is True, the generator + returned from the read method of the exporter will be evaluated + and all results will be stored in a list. This is useful if the + caller of this method actually needs an evaluated result set. An + example of this is calling this method asynchronously which needs + a pickleable return value(generators can't be pickled). + + Returns: + dict -- Result rows and relevant options used to calculate rows. These + options include: + * exporter: The exporter used. + * limit: The limit on the number of result rows. + * offset: The starting offset of the result rows. + * page: The starting page number of the result rows. + * queryset: The queryset used to gather results. + * rows: The result rows themselves. + * stop_page: The stop page of the result rows collection. + + """ + offset = None + + page = query_options.get('page') + limit = query_options.get('limit') or 0 + stop_page = query_options.get('stop_page') + query_name = query_options.get('query_name') + processor_name = query_options.get('processor') or 'default' + tree = query_options.get('tree') + export_type = query_options.get('export_type') or 'html' + reader = query_options.get('reader') + + if page is not None: + page = int(page) + + # Pages are 1-based. + if page < 1: + raise ValueError('Page must be greater than or equal to 1.') + + # Change to 0-base for calculating offset. + offset = limit * (page - 1) + + if stop_page: + stop_page = int(stop_page) + + # Cannot have a lower index stop page than start page. + if stop_page < page: + raise ValueError( + 'Stop page must be greater than or equal to start page.') + + # 4...5 means 4 and 5, not everything up to 5 like with + # list slices, so 4...4 is equivalent to just 4 + if stop_page > page: + limit = limit * stop_page + else: + # When no page or range is specified, the limit does not apply. + limit = None + + QueryProcessor = pipeline.query_processors[processor_name] + processor = QueryProcessor(context=context, view=view, tree=tree) + queryset = processor.get_queryset() + + # Isolate this query to a named connection. This will cancel an + # outstanding queries of the same name if one is present. + cancel_query(query_name) + queryset = isolate_queryset(query_name, queryset) + + # 0 limit means all for pagination, however the read method requires + # an explicit limit of None + limit = limit or None + + # We use HTMLExporter in Serrano but Avocado has it disabled. Until it + # is enabled in Avocado, we can reference the HTMLExporter directly here. + exporter = processor.get_exporter(get_exporter_class(export_type)) + + # This is an optimization when concepts are selected for ordering + # only. There is no guarantee to how many rows are required to get + # the desired `limit` of rows, so the query is unbounded. If all + # ordering facets are visible, the limit and offset can be pushed + # down to the query. + order_only = lambda f: not f.get('visible', True) + view_node = view.parse() + + if filter(order_only, view_node.facets): + iterable = processor.get_iterable(queryset=queryset) + rows = exporter.manual_read(iterable, + offset=offset, + limit=limit) + else: + iterable = processor.get_iterable(queryset=queryset, + limit=limit, + offset=offset) + method = exporter.reader(reader) + rows = method(iterable) + + if evaluate_rows: + rows = list(rows) + + return { + 'context': context, + 'export_type': export_type, + 'limit': limit, + 'offset': offset, + 'page': page, + 'processor': processor, + 'queryset': queryset, + 'rows': rows, + 'stop_page': stop_page, + 'view': view, + } diff --git a/requirements.txt b/requirements.txt index d448134..bf1e0da 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ ordereddict sqlparse mysql-python psycopg2 +django_rq diff --git a/setup.py b/setup.py index d8b3baa..b6d0ca9 100644 --- a/setup.py +++ b/setup.py @@ -7,6 +7,7 @@ 'modeltree>=1.1.9', 'South==1.0.2', 'jsonfield==1.0.0', + 'django_rq', ] if sys.version_info < (2, 7): diff --git a/tests/cases/query/tests/utils.py b/tests/cases/query/tests/utils.py index b177c84..82bf5a4 100644 --- a/tests/cases/query/tests/utils.py +++ b/tests/cases/query/tests/utils.py @@ -1,11 +1,17 @@ import time from threading import Thread + +from django.conf import settings +from django.core import management from django.db import connections, DatabaseError from django.test import TransactionTestCase -from django.core import management -from django.conf import settings -from tests.models import Employee +from rq.job import JobStatus + +from avocado.async import utils as async_utils +from avocado.models import DataContext, DataField, DataView from avocado.query import utils +from tests.models import Employee +from tests.processors import ManagerQueryProcessor class TempConnTest(TransactionTestCase): @@ -135,3 +141,190 @@ def stopper(t, db, name): t.assertTrue(canceled) self.run_cancel_test(runner, stopper) + + +class AsyncResultRowTestCase(TransactionTestCase): + fixtures = ['tests/fixtures/employee_data.json'] + + def setUp(self): + management.call_command('avocado', 'init', 'tests', quiet=True) + # Don't start with any jobs in the queue. + async_utils.cancel_all_jobs() + + def tearDown(self): + # Don't leave any jobs in the queue. + async_utils.cancel_all_jobs() + + def test_create_and_cancel(self): + # Create 3 meaningless jobs. We're just testing job setup and + # cancellation here, not the execution. + utils.async_get_result_rows(None, None, {}) + job_options = { + 'name': 'Job X', + } + job_x_id = utils.async_get_result_rows(None, None, {}, job_options) + job_options = { + 'name': 'Job Y', + 'query_name': 'job_y_query', + } + job_y_id = utils.async_get_result_rows(None, None, {}, job_options) + + self.assertEqual(async_utils.get_job_count(), 3) + + jobs = async_utils.get_jobs() + self.assertEqual(len(jobs), 3) + + job_x = async_utils.get_job(job_x_id) + self.assertTrue(job_x in jobs) + self.assertEqual(job_x.meta['name'], 'Job X') + + self.assertEqual(async_utils.cancel_job(job_x_id), None) + self.assertEqual(async_utils.get_job_count(), 2) + async_utils.cancel_job('invalid_id') + self.assertEqual(async_utils.get_job_count(), 2) + + self.assertTrue('canceled' in async_utils.cancel_job(job_y_id)) + self.assertTrue(async_utils.get_job_count(), 1) + + async_utils.cancel_all_jobs() + self.assertEqual(async_utils.get_job_count(), 0) + + def test_job_result(self): + context = DataContext() + view = DataView() + limit = 3 + query_options = { + 'limit': limit, + 'page': 1, + } + + job_id = utils.async_get_result_rows(context, view, query_options) + self.assertTrue(async_utils.get_job_count(), 1) + async_utils.run_jobs() + time.sleep(1) + result = async_utils.get_job_result(job_id) + self.assertEqual(async_utils.get_job(job_id).status, + JobStatus.FINISHED) + self.assertEqual(len(result['rows']), limit) + self.assertEqual(result['limit'], limit) + + def test_invalid_job_result(self): + context = DataContext() + view = DataView() + query_options = { + 'page': 0, + } + + job_id = utils.async_get_result_rows(context, view, query_options) + self.assertTrue(async_utils.get_job_count(), 1) + async_utils.run_jobs() + time.sleep(1) + self.assertEqual(async_utils.get_job_result(job_id), None) + self.assertEqual(async_utils.get_job(job_id).status, JobStatus.FAILED) + + +class ResultRowTestCase(TransactionTestCase): + fixtures = ['tests/fixtures/employee_data.json'] + + def setUp(self): + management.call_command('avocado', 'init', 'tests', quiet=True) + + def test_invalid_options(self): + # Page numbers less than 1 should not be allowed. + query_options = { + 'page': 0, + } + self.assertRaises(ValueError, + utils.get_result_rows, + None, + None, + query_options) + + # Stop pages before start pages should not be allowed. + query_options = { + 'page': 5, + 'stop_page': 1, + } + self.assertRaises(ValueError, + utils.get_result_rows, + None, + None, + query_options) + + def test_get_rows(self): + context = DataContext() + view = DataView() + + # Unless we tell the function to evaluate the rows, it should return + # rows as a generator so we need to exclicitly evaluate it here. + result = utils.get_result_rows(context, view, {}) + self.assertEqual(len(list(result['rows'])), Employee.objects.count()) + + # Now, have the method evaluate the rows. + result = utils.get_result_rows(context, view, {}, evaluate_rows=True) + self.assertEqual(len(result['rows']), Employee.objects.count()) + + def test_get_order_only(self): + field = DataField.objects.get(field_name='salary') + concept = field.concepts.all()[0] + + context = DataContext() + view = DataView(json=[{ + 'concept': concept.pk, + 'visible': False, + 'sort': 'desc', + }]) + result = utils.get_result_rows(context, view, {}) + self.assertEqual(len(list(result['rows'])), Employee.objects.count()) + + def test_limit(self): + context = DataContext() + view = DataView() + limit = 2 + query_options = { + 'limit': limit, + 'page': 1, + } + result = utils.get_result_rows(context, view, query_options) + self.assertEqual(len(list(result['rows'])), limit) + self.assertEqual(result['limit'], limit) + + def test_processor(self): + context = DataContext() + view = DataView() + processor = 'manager' + query_options = { + 'processor': processor, + } + result = utils.get_result_rows(context, view, query_options) + self.assertEqual(len(list(result['rows'])), + Employee.objects.filter(is_manager=True).count()) + self.assertTrue(isinstance(result['processor'], ManagerQueryProcessor)) + + def test_export_type(self): + context = DataContext() + view = DataView() + export_type = 'json' + query_options = { + 'export_type': export_type, + } + result = utils.get_result_rows(context, view, query_options) + self.assertEqual(len(list(result['rows'])), Employee.objects.count()) + self.assertEqual(result['export_type'], export_type) + + def test_pages(self): + context = DataContext() + view = DataView() + query_options = { + 'page': 1, + 'stop_page': 10, + } + result = utils.get_result_rows(context, view, query_options) + self.assertEqual(len(list(result['rows'])), Employee.objects.count()) + + query_options = { + 'page': 1, + 'stop_page': 1, + } + result = utils.get_result_rows(context, view, query_options) + self.assertEqual(len(list(result['rows'])), Employee.objects.count()) diff --git a/tests/processors.py b/tests/processors.py new file mode 100644 index 0000000..27de052 --- /dev/null +++ b/tests/processors.py @@ -0,0 +1,13 @@ +from avocado.models import DataContext +from avocado.query.pipeline import QueryProcessor + + +class ManagerQueryProcessor(QueryProcessor): + def __init__(self, *args, **kwargs): + kwargs['context'] = DataContext(json={ + 'field': 'tests.employee.is_manager', + 'operator': 'exact', + 'value': True, + }) + + super(ManagerQueryProcessor, self).__init__(*args, **kwargs) diff --git a/tests/settings.py b/tests/settings.py index 6c91dce..25e7b1c 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -78,6 +78,7 @@ 'django.contrib.sites', 'django.contrib.auth', 'django.contrib.contenttypes', + 'django_rq', 'haystack', 'guardian', ) @@ -142,17 +143,26 @@ 'handlers': ['null'], 'level': 'DEBUG', 'propagate': True, - } + }, + 'rq.worker': { + 'handlers': ['null'], + 'level': 'DEBUG', + }, } } SOUTH_TESTS_MIGRATE = False +AVOCADO_QUEUE_NAME = 'avocado_test_queue' AVOCADO = { 'HISTORY_ENABLED': False, 'HISTORY_MAX_SIZE': 50, 'METADATA_MIGRATION_APP': 'core', 'DATA_CACHE_ENABLED': False, + 'QUERY_PROCESSORS': { + 'manager': 'tests.processors.ManagerQueryProcessor', + }, + 'ASYNC_QUEUE': AVOCADO_QUEUE_NAME, } MODELTREES = { @@ -170,3 +180,11 @@ MIDDLEWARE_CLASSES = () SECRET_KEY = 'acb123' + +RQ_QUEUES = { + AVOCADO_QUEUE_NAME: { + 'HOST': 'localhost', + 'PORT': 6379, + 'DB': 0, + }, +} From a8cdc2211b40bc236a92f6b1db5f0b7cc402b7bc Mon Sep 17 00:00:00 2001 From: Don Naegely Date: Sat, 27 Jun 2015 13:57:17 -0700 Subject: [PATCH 2/2] Exclude */south_migrations/* from coverage Signed-off-by: Don Naegely --- .coveragerc | 1 + 1 file changed, 1 insertion(+) diff --git a/.coveragerc b/.coveragerc index d93c55f..b9f1270 100644 --- a/.coveragerc +++ b/.coveragerc @@ -3,6 +3,7 @@ branch = True source = avocado omit = */admin.py + */south_migrations/* */migrations/* avocado/query/parsers/__init__.py