diff --git a/apps/questions/models.py b/apps/questions/models.py index a7459ade00d..bfe2bf20cec 100755 --- a/apps/questions/models.py +++ b/apps/questions/models.py @@ -346,7 +346,7 @@ def extract_document(self): # answer_votes is the sum of votes for all of the answers. answer_votes = 0 - for ans in self.answers.all(): + for ans in self.answers.iterator(): answer_content.append(ans.content) has_helpful = has_helpful or bool(ans.num_helpful_votes) answer_creator.add(ans.creator.username) diff --git a/apps/search/es_utils.py b/apps/search/es_utils.py index 33f0c7c0428..a99bac7efc0 100644 --- a/apps/search/es_utils.py +++ b/apps/search/es_utils.py @@ -1,7 +1,6 @@ from itertools import chain, count, izip import logging from pprint import pprint -import time import elasticutils import pyes @@ -39,66 +38,40 @@ def get_doctype_stats(): return stats -def reindex_model(cls, percent=100): - """Reindexes all the objects for a single mode. +def get_es(**kwargs): + """Returns a fresh ES instance - Yields number of documents done. + Defaults for these arguments come from settings. Specifying them + in the function call will override the default. - Note: This gets run from the command line, so we log stuff to let - the user know what's going on. - - :arg cls: the model class - :arg percent: The percentage of questions to index. Defaults to - 100--e.g. all of them. + :arg server: settings.ES_HOSTS + :arg timeout: settings.ES_INDEXING_TIMEOUT + :arg bulk_size: settings.ES_FLUSH_BULK_EVERY """ - doc_type = cls._meta.db_table - index = cls._get_index() - - start_time = time.time() - - log.info('reindex %s into %s index', doc_type, index) - - es = pyes.ES(settings.ES_HOSTS, timeout=settings.ES_INDEXING_TIMEOUT) - - log.info('setting up mapping....') - mapping = cls.get_mapping() - es.put_mapping(doc_type, mapping, index) - - log.info('iterating through %s....', doc_type) - total = cls.objects.count() - to_index = int(total * (percent / 100.0)) - log.info('total %s: %s (to be indexed: %s)', doc_type, total, to_index) - total = to_index + defaults = { + 'server': settings.ES_HOSTS, + 'timeout': settings.ES_INDEXING_TIMEOUT, + 'bulk_size': settings.ES_FLUSH_BULK_EVERY + } + defaults.update(kwargs) - t = 0 - for obj in cls.objects.order_by('id').all(): - t += 1 - if t % 1000 == 0: - time_to_go = (total - t) * ((time.time() - start_time) / t) - if time_to_go < 60: - time_to_go = "%d secs" % time_to_go - else: - time_to_go = "%d min" % (time_to_go / 60) - log.info('%s/%s... (%s to go)', t, total, time_to_go) + return pyes.ES(**defaults) - if t % settings.ES_FLUSH_BULK_EVERY == 0: - es.flush_bulk() - if t > total: - break +def format_time(time_to_go): + """Returns minutes and seconds string for given time in seconds""" + if time_to_go < 60: + return "%ds" % time_to_go + return "%dm %ds" % (time_to_go / 60, time_to_go % 60) - cls.index(obj.extract_document(), bulk=True, es=es) - yield t - es.flush_bulk(forced=True) - log.info('done!') - es.refresh() - - -def es_reindex_with_progress(percent=100): +def es_reindex_with_progress(doctypes=None, percent=100): """Rebuild Elastic indexes as you iterate over yielded progress ratios. + :arg doctypes: Defaults to None which will index all doctypes. + Otherwise indexes the doctypes specified. See + :py:func:`.get_doctype_stats()` for what doctypes look like. :arg percent: Defaults to 100. Allows you to specify how much of each doctype you want to index. This is useful for development where doing a full reindex takes an hour. @@ -108,24 +81,34 @@ def es_reindex_with_progress(percent=100): es = elasticutils.get_es() - # Go through and delete, then recreate the indexes. - for index in settings.ES_INDEXES.values(): - es.delete_index_if_exists(index) - es.create_index(index) - search_models = get_search_models() + if doctypes: + search_models = [cls for cls in search_models + if cls._meta.db_table in doctypes] + + if len(search_models) == len(get_search_models()): + index = settings.ES_INDEXES.get('default') + if index is not None: + # If we're indexing everything and there's a default index + # specified in settings, then we delete and recreate it. + es.delete_index_if_exists(index) + es.create_index(index) total = sum([cls.objects.count() for cls in search_models]) - to_index = [reindex_model(cls, percent) for cls in search_models] + to_index = [cls.index_all(percent) for cls in search_models] return (float(done) / total for done, _ in izip(count(1), chain(*to_index))) -def es_reindex(percent=100): - """Rebuild ElasticSearch indexes""" - [x for x in es_reindex_with_progress(percent) if False] +def es_reindex(doctypes=None, percent=100): + """Rebuild ElasticSearch indexes + + See :py:func:`.es_reindex_with_progress` for argument details. + + """ + [x for x in es_reindex_with_progress(doctypes, percent) if False] def es_whazzup(): diff --git a/apps/search/management/commands/esreindex.py b/apps/search/management/commands/esreindex.py index 9a155ae7b2a..59dcd4d775b 100644 --- a/apps/search/management/commands/esreindex.py +++ b/apps/search/management/commands/esreindex.py @@ -2,6 +2,7 @@ from django.core.management.base import BaseCommand, CommandError from optparse import make_option from search.es_utils import es_reindex +from search.models import get_search_models class Command(BaseCommand): @@ -15,4 +16,15 @@ def handle(self, *args, **options): percent = options['percent'] if percent > 100 or percent < 1: raise CommandError('percent should be between 1 and 100') - es_reindex(percent) + + if args: + search_models = get_search_models() + possible_doctypes = dict((cls._meta.db_table, cls) + for cls in search_models) + for mem in args: + if mem not in possible_doctypes: + raise CommandError('"%s" is not a valid doctype (%s)' % + (mem, possible_doctypes.keys())) + + # args are the list of doctypes to index. + es_reindex(args, percent) diff --git a/apps/search/models.py b/apps/search/models.py index 274c5fc3c62..871f234efaf 100644 --- a/apps/search/models.py +++ b/apps/search/models.py @@ -1,14 +1,17 @@ import elasticutils import logging import pyes +import time from threading import local from django.conf import settings from django.core import signals +from django.db import reset_queries from django.db.models.signals import pre_delete, post_save from django.dispatch import receiver from search.tasks import index_task, unindex_task +from search import es_utils log = logging.getLogger('es_search') @@ -78,7 +81,7 @@ def extract_document(self): raise NotImplementedError @classmethod - def _get_index(cls): + def get_es_index(cls): """Returns the index for this class""" indexes = settings.ES_INDEXES return indexes.get(cls._meta.db_table) or indexes['default'] @@ -87,11 +90,93 @@ def index_later(self): """Register myself to be indexed at the end of the request.""" _local_tasks().add((index_task.delay, (self.__class__, (self.id,)))) - def unindex_later(self): """Register myself to be unindexed at the end of the request.""" _local_tasks().add((unindex_task.delay, (self.__class__, (self.id,)))) + @classmethod + def index_all(cls, percent=100): + """Reindexes all the objects for this model. + + Yields number of documents done. + + Note: This can get run from the command line, so we log stuff + to let the user know what's going on. + + :arg percent: The percentage of questions to index. Defaults to + 100--e.g. all of them. + + """ + es = es_utils.get_es() + + doc_type = cls._meta.db_table + index = cls.get_es_index() + + if index != settings.ES_INDEXES.get('default'): + # If this doctype isn't using the default index, then this + # doctype is responsible for deleting and re-creating the + # index. + es.delete_index_if_exists(index) + es.create_index(index) + + start_time = time.time() + + log.info('reindex %s into %s index', doc_type, index) + + log.info('setting up mapping....') + mapping = cls.get_mapping() + es.put_mapping(doc_type, mapping, index) + + log.info('iterating through %s....', doc_type) + total = cls.objects.count() + to_index = int(total * (percent / 100.0)) + log.info('total %s: %s (to be indexed: %s)', doc_type, total, to_index) + total = to_index + + # Some models have a gazillion instances. So we want to go + # through them one at a time in a way that doesn't pull all + # the data into memory all at once. So we iterate through ids + # and pull the objects one at a time. + qs = cls.objects.order_by('id').values_list('id', flat=True) + + for t, obj_id in enumerate(qs.iterator()): + if t > total: + break + + obj = cls.objects.get(pk=obj_id) + + if t % 1000 == 0 and t > 0: + time_to_go = (total - t) * ((time.time() - start_time) / t) + log.info('%s/%s... (%s to go)', t, total, + es_utils.format_time(time_to_go)) + + # We call this every 1000 or so because we're + # essentially loading the whole db and if DEBUG=True, + # then Django saves every sql statement which causes + # our memory to go up up up. So we reset it and that + # makes things happier even in DEBUG environments. + reset_queries() + + if t % settings.ES_FLUSH_BULK_EVERY == 0: + # We built the ES with this setting, but it doesn't + # actually do anything with it unless we call + # flush_bulk which causes it to check its bulk_size + # and flush it if it's too big. + es.flush_bulk() + + try: + cls.index(obj.extract_document(), bulk=True, es=es) + except Exception: + log.exception('Unable to extract/index document (id: %d)', + obj.id) + + yield t + + es.flush_bulk(forced=True) + end_time = time.time() + log.info('done! (%s)', es_utils.format_time(end_time - start_time)) + es.refresh() + @classmethod def index(cls, document, bulk=False, force_insert=False, refresh=False, es=None): @@ -102,7 +187,7 @@ def index(cls, document, bulk=False, force_insert=False, refresh=False, if es is None: es = elasticutils.get_es() - index = cls._get_index() + index = cls.get_es_index() doc_type = cls._meta.db_table # TODO: handle pyes.urllib3.TimeoutErrors here. @@ -118,7 +203,7 @@ def unindex(cls, id): if not settings.ES_LIVE_INDEXING: return - index = cls._get_index() + index = cls.get_es_index() doc_type = cls._meta.db_table try: elasticutils.get_es().delete(index, doc_type, id) @@ -129,9 +214,11 @@ def unindex(cls, id): _identity = lambda s: s + + def register_for_indexing(sender_class, - app, - instance_to_indexee=_identity): + app, + instance_to_indexee=_identity): """Register a model whose changes might invalidate ElasticSearch indexes. Specifically, each time an instance of this model is saved or deleted, the diff --git a/docs/searchchapter.rst b/docs/searchchapter.rst index bb1436414d2..cbd60b2af40 100644 --- a/docs/searchchapter.rst +++ b/docs/searchchapter.rst @@ -188,7 +188,7 @@ Other things you can change: ``ES_FLUSH_BULK_EVERY`` - Defaults to 1000. + Defaults to 100. We do bulk indexing meaning we queue up a bunch and then push them through all at the same time. This requires memory to queue them, @@ -248,7 +248,7 @@ Do a complete reindexing of everything by:: $ ./manage.py esreindex This will delete the existing indexes, create new ones, and reindex -everything in your database. On my machine it takes about > 30 minutes. +everything in your database. On my machine it takes about an hour. If you need to get stuff done and don't want to wait for a full indexing, you can index a percentage of things. @@ -263,12 +263,25 @@ This indexes 50% of your data ordered by id:: I use this when I'm fiddling with mappings and the indexing code. +Also, you can index specific doctypes. Doctypes are named are the +``_meta.db_table`` of the model they map to. At the time of this writing, +there are three doctypes: + +* questions_question +* wiki_document +* forums_thread + +You can index specific doctypes by specifying the doctypes on the command +line. This reindexes just questions:: + + $ ./manage.py esreindex questions_question + .. Note:: Once you've indexed everything, you won't have to do it again unless - indexing code changes. The models have post_save and pre_delete hooks - that will update the index as the data changes. + indexing code changes. The models have ``post_save`` and ``pre_delete`` + hooks that will update the index as the data changes. Health/statistics @@ -278,4 +291,5 @@ You can see Elastic Search statistics/health with:: $ ./manage.py eswhazzup +The last few lines tell you how many documents are in the index by doctype. I use this to make sure I've got stuff in my index. diff --git a/settings.py b/settings.py index f629681fa0a..10088061356 100644 --- a/settings.py +++ b/settings.py @@ -598,7 +598,7 @@ def JINJA_CONFIG(): ES_INDEXING_TIMEOUT = 30 # 30 second timeouts for all things indexing # Seconds between updating admin progress bar: ES_REINDEX_PROGRESS_BAR_INTERVAL = 5 -ES_FLUSH_BULK_EVERY = 1000 +ES_FLUSH_BULK_EVERY = 100 # # Connection information for Sphinx search