Skip to content

Commit

Permalink
[bug 718826, 715932] Make ES indexing less sucky
Browse files Browse the repository at this point in the history
* the ES connection already has code for forcing bulk, so we don't need to
  repeat that. this changes the code to push the setting to ES.
* this also tweaks the estimation code so that it shows minutes and seconds and
  shows the total delta later. Now I can stop running
  "time ./manage.py esreindex".
* fix esreindex so that you can specify doctypes. This will appropriately
  create/delete indexes so that what you don't want to delete won't get
  deleted.
* adds basic handling for bad data.

  This does a log.exception, but we really should log more than that and/or
  make it more obvious to developers that there's bad data out there.

  In the meantime, this allows us to continue indexing.
* reduced memory usage of indexing by iterating over ids---now it runs on
  my laptop.
* ghanges _get_index() to get_es_index(). We use it so often it might as
  well be part of the "public API".
* fixed create/delete indexes so that to switch doctypes to their own index
  is now just a change in settings---no code changes needed.
* fix DEBUG = True case by reseting queries
* this also adds a bunch of helpful comments, moves reindex_model to
  SearchMixin.index_all, and has some other cosmetic code cleanup.

End result of this is that indexing doesn't die if it hits bad data, indexing
takes much less memory to run, you can specify specific doctypes to index at
the command line, and the code is better.
  • Loading branch information
willkg committed Jan 18, 2012
1 parent 7c4190d commit 57f4faf
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 72 deletions.
2 changes: 1 addition & 1 deletion apps/questions/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def extract_document(self):
# answer_votes is the sum of votes for all of the answers.
answer_votes = 0

for ans in self.answers.all():
for ans in self.answers.iterator():
answer_content.append(ans.content)
has_helpful = has_helpful or bool(ans.num_helpful_votes)
answer_creator.add(ans.creator.username)
Expand Down
101 changes: 42 additions & 59 deletions apps/search/es_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from itertools import chain, count, izip
import logging
from pprint import pprint
import time

import elasticutils
import pyes
Expand Down Expand Up @@ -39,66 +38,40 @@ def get_doctype_stats():
return stats


def reindex_model(cls, percent=100):
"""Reindexes all the objects for a single mode.
def get_es(**kwargs):
"""Returns a fresh ES instance
Yields number of documents done.
Defaults for these arguments come from settings. Specifying them
in the function call will override the default.
Note: This gets run from the command line, so we log stuff to let
the user know what's going on.
:arg cls: the model class
:arg percent: The percentage of questions to index. Defaults to
100--e.g. all of them.
:arg server: settings.ES_HOSTS
:arg timeout: settings.ES_INDEXING_TIMEOUT
:arg bulk_size: settings.ES_FLUSH_BULK_EVERY
"""
doc_type = cls._meta.db_table
index = cls._get_index()

start_time = time.time()

log.info('reindex %s into %s index', doc_type, index)

es = pyes.ES(settings.ES_HOSTS, timeout=settings.ES_INDEXING_TIMEOUT)

log.info('setting up mapping....')
mapping = cls.get_mapping()
es.put_mapping(doc_type, mapping, index)

log.info('iterating through %s....', doc_type)
total = cls.objects.count()
to_index = int(total * (percent / 100.0))
log.info('total %s: %s (to be indexed: %s)', doc_type, total, to_index)
total = to_index
defaults = {
'server': settings.ES_HOSTS,
'timeout': settings.ES_INDEXING_TIMEOUT,
'bulk_size': settings.ES_FLUSH_BULK_EVERY
}
defaults.update(kwargs)

t = 0
for obj in cls.objects.order_by('id').all():
t += 1
if t % 1000 == 0:
time_to_go = (total - t) * ((time.time() - start_time) / t)
if time_to_go < 60:
time_to_go = "%d secs" % time_to_go
else:
time_to_go = "%d min" % (time_to_go / 60)
log.info('%s/%s... (%s to go)', t, total, time_to_go)
return pyes.ES(**defaults)

if t % settings.ES_FLUSH_BULK_EVERY == 0:
es.flush_bulk()

if t > total:
break
def format_time(time_to_go):
"""Returns minutes and seconds string for given time in seconds"""
if time_to_go < 60:
return "%ds" % time_to_go
return "%dm %ds" % (time_to_go / 60, time_to_go % 60)

cls.index(obj.extract_document(), bulk=True, es=es)
yield t

es.flush_bulk(forced=True)
log.info('done!')
es.refresh()


def es_reindex_with_progress(percent=100):
def es_reindex_with_progress(doctypes=None, percent=100):
"""Rebuild Elastic indexes as you iterate over yielded progress ratios.
:arg doctypes: Defaults to None which will index all doctypes.
Otherwise indexes the doctypes specified. See
:py:func:`.get_doctype_stats()` for what doctypes look like.
:arg percent: Defaults to 100. Allows you to specify how much of
each doctype you want to index. This is useful for
development where doing a full reindex takes an hour.
Expand All @@ -108,24 +81,34 @@ def es_reindex_with_progress(percent=100):

es = elasticutils.get_es()

# Go through and delete, then recreate the indexes.
for index in settings.ES_INDEXES.values():
es.delete_index_if_exists(index)
es.create_index(index)

search_models = get_search_models()
if doctypes:
search_models = [cls for cls in search_models
if cls._meta.db_table in doctypes]

if len(search_models) == len(get_search_models()):
index = settings.ES_INDEXES.get('default')
if index is not None:
# If we're indexing everything and there's a default index
# specified in settings, then we delete and recreate it.
es.delete_index_if_exists(index)
es.create_index(index)

total = sum([cls.objects.count() for cls in search_models])

to_index = [reindex_model(cls, percent) for cls in search_models]
to_index = [cls.index_all(percent) for cls in search_models]

return (float(done) / total for done, _ in
izip(count(1), chain(*to_index)))


def es_reindex(percent=100):
"""Rebuild ElasticSearch indexes"""
[x for x in es_reindex_with_progress(percent) if False]
def es_reindex(doctypes=None, percent=100):
"""Rebuild ElasticSearch indexes
See :py:func:`.es_reindex_with_progress` for argument details.
"""
[x for x in es_reindex_with_progress(doctypes, percent) if False]


def es_whazzup():
Expand Down
14 changes: 13 additions & 1 deletion apps/search/management/commands/esreindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from django.core.management.base import BaseCommand, CommandError
from optparse import make_option
from search.es_utils import es_reindex
from search.models import get_search_models


class Command(BaseCommand):
Expand All @@ -15,4 +16,15 @@ def handle(self, *args, **options):
percent = options['percent']
if percent > 100 or percent < 1:
raise CommandError('percent should be between 1 and 100')
es_reindex(percent)

if args:
search_models = get_search_models()
possible_doctypes = dict((cls._meta.db_table, cls)
for cls in search_models)
for mem in args:
if mem not in possible_doctypes:
raise CommandError('"%s" is not a valid doctype (%s)' %
(mem, possible_doctypes.keys()))

# args are the list of doctypes to index.
es_reindex(args, percent)
99 changes: 93 additions & 6 deletions apps/search/models.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import elasticutils
import logging
import pyes
import time
from threading import local

from django.conf import settings
from django.core import signals
from django.db import reset_queries
from django.db.models.signals import pre_delete, post_save
from django.dispatch import receiver

from search.tasks import index_task, unindex_task
from search import es_utils

log = logging.getLogger('es_search')

Expand Down Expand Up @@ -78,7 +81,7 @@ def extract_document(self):
raise NotImplementedError

@classmethod
def _get_index(cls):
def get_es_index(cls):
"""Returns the index for this class"""
indexes = settings.ES_INDEXES
return indexes.get(cls._meta.db_table) or indexes['default']
Expand All @@ -87,11 +90,93 @@ def index_later(self):
"""Register myself to be indexed at the end of the request."""
_local_tasks().add((index_task.delay, (self.__class__, (self.id,))))


def unindex_later(self):
"""Register myself to be unindexed at the end of the request."""
_local_tasks().add((unindex_task.delay, (self.__class__, (self.id,))))

@classmethod
def index_all(cls, percent=100):
"""Reindexes all the objects for this model.
Yields number of documents done.
Note: This can get run from the command line, so we log stuff
to let the user know what's going on.
:arg percent: The percentage of questions to index. Defaults to
100--e.g. all of them.
"""
es = es_utils.get_es()

doc_type = cls._meta.db_table
index = cls.get_es_index()

if index != settings.ES_INDEXES.get('default'):
# If this doctype isn't using the default index, then this
# doctype is responsible for deleting and re-creating the
# index.
es.delete_index_if_exists(index)
es.create_index(index)

start_time = time.time()

log.info('reindex %s into %s index', doc_type, index)

log.info('setting up mapping....')
mapping = cls.get_mapping()
es.put_mapping(doc_type, mapping, index)

log.info('iterating through %s....', doc_type)
total = cls.objects.count()
to_index = int(total * (percent / 100.0))
log.info('total %s: %s (to be indexed: %s)', doc_type, total, to_index)
total = to_index

# Some models have a gazillion instances. So we want to go
# through them one at a time in a way that doesn't pull all
# the data into memory all at once. So we iterate through ids
# and pull the objects one at a time.
qs = cls.objects.order_by('id').values_list('id', flat=True)

for t, obj_id in enumerate(qs.iterator()):
if t > total:
break

obj = cls.objects.get(pk=obj_id)

if t % 1000 == 0 and t > 0:
time_to_go = (total - t) * ((time.time() - start_time) / t)
log.info('%s/%s... (%s to go)', t, total,
es_utils.format_time(time_to_go))

# We call this every 1000 or so because we're
# essentially loading the whole db and if DEBUG=True,
# then Django saves every sql statement which causes
# our memory to go up up up. So we reset it and that
# makes things happier even in DEBUG environments.
reset_queries()

if t % settings.ES_FLUSH_BULK_EVERY == 0:
# We built the ES with this setting, but it doesn't
# actually do anything with it unless we call
# flush_bulk which causes it to check its bulk_size
# and flush it if it's too big.
es.flush_bulk()

try:
cls.index(obj.extract_document(), bulk=True, es=es)
except Exception:
log.exception('Unable to extract/index document (id: %d)',
obj.id)

yield t

es.flush_bulk(forced=True)
end_time = time.time()
log.info('done! (%s)', es_utils.format_time(end_time - start_time))
es.refresh()

@classmethod
def index(cls, document, bulk=False, force_insert=False, refresh=False,
es=None):
Expand All @@ -102,7 +187,7 @@ def index(cls, document, bulk=False, force_insert=False, refresh=False,
if es is None:
es = elasticutils.get_es()

index = cls._get_index()
index = cls.get_es_index()
doc_type = cls._meta.db_table

# TODO: handle pyes.urllib3.TimeoutErrors here.
Expand All @@ -118,7 +203,7 @@ def unindex(cls, id):
if not settings.ES_LIVE_INDEXING:
return

index = cls._get_index()
index = cls.get_es_index()
doc_type = cls._meta.db_table
try:
elasticutils.get_es().delete(index, doc_type, id)
Expand All @@ -129,9 +214,11 @@ def unindex(cls, id):


_identity = lambda s: s


def register_for_indexing(sender_class,
app,
instance_to_indexee=_identity):
app,
instance_to_indexee=_identity):
"""Register a model whose changes might invalidate ElasticSearch indexes.
Specifically, each time an instance of this model is saved or deleted, the
Expand Down
22 changes: 18 additions & 4 deletions docs/searchchapter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ Other things you can change:

``ES_FLUSH_BULK_EVERY``

Defaults to 1000.
Defaults to 100.

We do bulk indexing meaning we queue up a bunch and then push them
through all at the same time. This requires memory to queue them,
Expand Down Expand Up @@ -248,7 +248,7 @@ Do a complete reindexing of everything by::
$ ./manage.py esreindex

This will delete the existing indexes, create new ones, and reindex
everything in your database. On my machine it takes about > 30 minutes.
everything in your database. On my machine it takes about an hour.

If you need to get stuff done and don't want to wait for a full indexing,
you can index a percentage of things.
Expand All @@ -263,12 +263,25 @@ This indexes 50% of your data ordered by id::

I use this when I'm fiddling with mappings and the indexing code.

Also, you can index specific doctypes. Doctypes are named are the
``_meta.db_table`` of the model they map to. At the time of this writing,
there are three doctypes:

* questions_question
* wiki_document
* forums_thread

You can index specific doctypes by specifying the doctypes on the command
line. This reindexes just questions::

$ ./manage.py esreindex questions_question


.. Note::

Once you've indexed everything, you won't have to do it again unless
indexing code changes. The models have post_save and pre_delete hooks
that will update the index as the data changes.
indexing code changes. The models have ``post_save`` and ``pre_delete``
hooks that will update the index as the data changes.


Health/statistics
Expand All @@ -278,4 +291,5 @@ You can see Elastic Search statistics/health with::

$ ./manage.py eswhazzup

The last few lines tell you how many documents are in the index by doctype.
I use this to make sure I've got stuff in my index.
2 changes: 1 addition & 1 deletion settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ def JINJA_CONFIG():
ES_INDEXING_TIMEOUT = 30 # 30 second timeouts for all things indexing
# Seconds between updating admin progress bar:
ES_REINDEX_PROGRESS_BAR_INTERVAL = 5
ES_FLUSH_BULK_EVERY = 1000
ES_FLUSH_BULK_EVERY = 100

#
# Connection information for Sphinx search
Expand Down

0 comments on commit 57f4faf

Please sign in to comment.