Skip to content

Commit

Permalink
Merge pull request #103 from 4dn-dcic/defer_work
Browse files Browse the repository at this point in the history
1.2.2 -- Indexer fixes related to KeyError and MissingIndexItemException
  • Loading branch information
carlvitzthum authored Aug 22, 2019
2 parents 48b2eea + 27144bc commit f2d173b
Show file tree
Hide file tree
Showing 15 changed files with 124 additions and 90 deletions.
1 change: 0 additions & 1 deletion buildout.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ output = ${buildout:directory}/production.ini
accession_factory = snowflakes.server_defaults.test_accession
file_upload_bucket = snowflakes-files-dev
blob_bucket = snovault-blobs-dev
indexer_processes =

[production]
recipe = collective.recipe.modwsgi
Expand Down
8 changes: 0 additions & 8 deletions candidate.cfg

This file was deleted.

1 change: 0 additions & 1 deletion development.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ load_test_only = true
create_tables = true
testing = true
postgresql.statement_timeout = 20
indexer.processes =
should_index = true
elasticsearch.aws_auth = false

Expand Down
1 change: 0 additions & 1 deletion production.ini.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ file_upload_bucket = ${file_upload_bucket}
blob_bucket = ${blob_bucket}
blob_store_profile_name = encoded-files-upload
accession_factory = ${accession_factory}
indexer.processes = ${indexer_processes}
elasticsearch.aws_auth = true

[composite:indexer]
Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
README = open(os.path.join(here, 'README.rst')).read()
CHANGES = open(os.path.join(here, 'CHANGES.rst')).read()

version_path = os.path.join(here, "src/snovault/_version.py")
this_version = open(version_path).readlines()[-1].split()[-1].strip("\"'")


requires = [
'Pillow',
Expand Down Expand Up @@ -64,7 +67,7 @@

setup(
name='snovault',
version='1.2.1',
version=this_version,
description='Snovault Hybrid Object Relational Database Framework',
long_description=README + '\n\n' + CHANGES,
packages=find_packages('src'),
Expand Down
4 changes: 4 additions & 0 deletions src/snovault/_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Version information."""

# The following line *must* be the last in the module, exactly as formatted:
__version__ = "1.2.2"
13 changes: 12 additions & 1 deletion src/snovault/elasticsearch/es_index_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"""

from webtest import TestApp
from snovault.elasticsearch import ELASTIC_SEARCH
from snovault.elasticsearch.interfaces import (
ELASTIC_SEARCH,
INDEXER_QUEUE
)
import atexit
import datetime
import elasticsearch.exceptions
Expand Down Expand Up @@ -47,8 +50,16 @@ def run(testapp, interval=DEFAULT_INTERVAL, dry_run=False, path='/index', update
es = testapp.app.registry[ELASTIC_SEARCH]
es.info()

queue = testapp.app.registry[INDEXER_QUEUE]

# main listening loop
while True:
# if not messages to index, skip the /index call. Counts are approximate
queue_counts = queue.number_of_messages()
if (not queue_counts['primary_waiting'] and not queue_counts['secondary_waiting']):
time.sleep(interval)
continue

try:
res = testapp.post_json(path, {
'record': True,
Expand Down
11 changes: 7 additions & 4 deletions src/snovault/elasticsearch/esstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,21 +157,24 @@ def purge_uuid(self, rid, item_type=None):
Attempt to purge an item by given resource id (rid), completely
removing it from ES and DB.
"""
if not item_type: # ES deletion requires index & doc_type, which are both == item_type
model = self.get_by_uuid(rid)
model = self.get_by_uuid(rid)
# ES deletion requires index & doc_type, which are both == item_type
if not item_type:
item_type = model.item_type
max_sid = model.max_sid
uuids_linking_to_item = self.find_uuids_linked_to_item(rid)
if len(uuids_linking_to_item) > 0:
raise HTTPLocked(detail="Cannot purge item as other items still link to it",
comment=uuids_linking_to_item)
log.error('PURGE: purging %s' % rid)
log.warning('PURGE: purging %s' % rid)

# delete the item from DB
self.write.purge_uuid(rid)
# delete the item from ES and also the mirrored ES if present
self.read.purge_uuid(rid, item_type, self.registry)
# queue related items for reindexing
self.registry[INDEXER].find_and_queue_secondary_items(set([rid]), set())
self.registry[INDEXER].find_and_queue_secondary_items(set([rid]), set(),
sid=max_sid)

def get_rev_links(self, model, rel, *item_types):
return self.storage().get_rev_links(model, rel, *item_types)
Expand Down
99 changes: 71 additions & 28 deletions src/snovault/elasticsearch/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
ConnectionError,
TransportError,
)
from ..indexing_views import SidException
from ..embed import MissingIndexItemException
from pyramid.view import view_config
from urllib3.exceptions import ReadTimeoutError
Expand All @@ -14,6 +13,7 @@
)
from snovault import (
DBSESSION,
STORAGE
)
from .indexer_utils import find_uuids_for_indexing
import datetime
Expand All @@ -33,6 +33,35 @@ def includeme(config):
registry[INDEXER] = Indexer(registry)


# really simple exception to know when the sid check fails
class SidException(Exception):
pass


def check_sid(sid, max_sid):
"""
Simple function to compare a given sid to given max_sid.
Raise an Exception if malformed or lesser max_sid
Args:
sid (int): query sid
max_sid (int): maximum sid to compare to
Raises:
ValueError: if sid or max_sid are not valid
SidException: if sid in request is greater than max sid
"""
try:
sid = int(sid)
max_sid = int(max_sid)
except ValueError:
raise ValueError('sid (%s) and max sid (%s) must be integers.'
% (sid, max_sid))
if max_sid < sid:
raise SidException('Query sid (%s) is greater than max sid (%s).'
% (sid, max_sid))


@view_config(route_name='index', request_method='POST', permission="index")
def index(request):
# Setting request.datastore here only works because routed views are not traversed.
Expand Down Expand Up @@ -188,6 +217,8 @@ def update_objects_queue(self, request, counter):
rev_linked_uuids = set()
to_delete = [] # hold messages that will be deleted
to_defer = [] # hold messages once we need to restart the worker
# max_sid does not change of the lifetime of request
max_sid = request.registry[STORAGE].write.get_max_sid()
deferred = False # if true, we need to restart the worker
messages, target_queue = self.get_messages_from_queue()
while len(messages) > 0:
Expand All @@ -214,22 +245,32 @@ def update_objects_queue(self, request, counter):
# build the object and index into ES
# if strict, do not add uuids rev_linking to item to queue
if msg_body['strict'] is True:
error = self.update_object(request, msg_uuid, None,
sid=msg_sid, curr_time=msg_curr_time,
error = self.update_object(request, msg_uuid,
add_to_secondary=None,
sid=msg_sid, max_sid=max_sid,
curr_time=msg_curr_time,
telemetry_id=msg_telemetry)
else:
error = self.update_object(request, msg_uuid, rev_linked_uuids,
sid=msg_sid, curr_time=msg_curr_time,
error = self.update_object(request, msg_uuid,
add_to_secondary=rev_linked_uuids,
sid=msg_sid, max_sid=max_sid,
curr_time=msg_curr_time,
telemetry_id=msg_telemetry)
if error:
if error.get('error_message') == 'defer_restart':
if error.get('error_message') == 'defer_resend':
# resend the message and delete original so that receive
# count is not affected. set `deferred` to restart worker
to_defer.append(msg_body)
to_delete.append(msg)
deferred = True
elif error.get('error_message') == 'defer_replace':
# replace the message with a VisibilityTimeout
# set `deferred` to restart worker
self.queue.replace_messages([msg], target_queue=target_queue, vis_timeout=180)
deferred = True
else:
# on a regular error, replace the message back in the queue
# regular error, replace the message with a VisibilityTimeout
# could do something with error, like putting on elasticache
# set VisibilityTimeout high so that other items can process
self.queue.replace_messages([msg], target_queue=target_queue, vis_timeout=180)
errors.append(error)
else:
Expand Down Expand Up @@ -289,8 +330,7 @@ def update_objects_sync(self, request, sync_uuids, counter):
"""
errors = []
for i, uuid in enumerate(sync_uuids):
# add_to_secondary = None here since invalidation is not used
error = self.update_object(request, uuid, None)
error = self.update_object(request, uuid)
if error is not None: # don't increment counter on an error
errors.append(error)
elif counter:
Expand All @@ -299,7 +339,7 @@ def update_objects_sync(self, request, sync_uuids, counter):


def update_object(self, request, uuid, add_to_secondary=None, sid=None,
curr_time=None, telemetry_id=None):
max_sid=None, curr_time=None, telemetry_id=None):
"""
Actually index the uuid using the index-data view.
add_to_secondary is a set that gets the rev_linked_to_me
Expand All @@ -308,7 +348,7 @@ def update_object(self, request, uuid, add_to_secondary=None, sid=None,
# logging constant
cat = 'index object'

#timing stuff
# timing stuff
start = timer()
if not curr_time:
curr_time = datetime.datetime.utcnow().isoformat() # utc
Expand All @@ -322,33 +362,36 @@ def update_object(self, request, uuid, add_to_secondary=None, sid=None,
if telemetry_id.startswith('cm_run_'):
cm_source = True

# check the sid with a less intensive view than @@index-data
index_data_query = '/%s/@@index-data' % uuid
if sid:
index_data_query += '?sid=%s' % sid

try:
# check sid first -- will raise SidException if invalid
if sid is not None:
check_sid(sid, max_sid) # max_sid should be set already
result = request.embed(index_data_query, as_user='INDEXER')
duration = timer() - start
log.bind(collection=result.get('item_type'))
# log.info("Time to embed", duration=duration, cat="embed object")
except SidException as e:
duration = timer() - start
log.warning('Invalid max sid', duration=duration, cat=cat)
# this will cause the item to be deferred by restarting worker
return {'error_message': 'defer_restart'}
except (MissingIndexItemException, KeyError) as e:
# only consider a KeyError deferrable if not already in deferred queue
log.warning('Invalid max sid. Resending...', duration=duration, cat=cat)
# causes the item to be deferred by restarting worker
# item will be re-sent (won't affect receive count)
return {'error_message': 'defer_resend'}
except MissingIndexItemException:
# cannot find item. This could be due to it being purged.
# if message is from create mapping, simply skip.
# otherwise replace message and item will possibly make it to DLQ
duration = timer() - start
if e.__class__ == MissingIndexItemException and cm_source:
# cannot find the item in the DB by uuid and the message came
# from create-mapping. Skip completely
log.error('Missing create-mapping resource when indexing. Skipping', duration=duration, cat=cat)
if cm_source:
log.error('MissingIndexItemException encountered on resource %s'
' from create_mapping. Skipping...' % index_data_query,
duration=duration, cat=cat)
return
else:
log.info('KeyError', duration=duration, cat=cat)
# this will cause the item to be deferred by restarting worker
return {'error_message': 'defer_restart'}
log.warning('MissingIndexItemException encountered on resource '
'%s. No sid found. Replacing...' % index_data_query,
duration=duration, cat=cat)
return {'error_message': 'defer_replace'}
except Exception as e:
duration = timer() - start
log.error('Error rendering @@index-data', duration=duration, exc_info=True, cat=cat)
Expand Down
14 changes: 9 additions & 5 deletions src/snovault/elasticsearch/indexer_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pyramid.decorator import reify
from .interfaces import INDEXER_QUEUE, INDEXER_QUEUE_MIRROR
from .indexer_utils import get_uuids_for_types
from collections import OrderedDict

log = structlog.getLogger(__name__)

Expand Down Expand Up @@ -70,12 +71,15 @@ def queue_indexing(request):
target = request.json.get('target_queue', 'primary')
if req_uuids:
# queue these as secondary
queued, failed = queue_indexer.add_uuids(request.registry, req_uuids, strict=strict,
target_queue=target, telemetry_id=telemetry_id)
queued, failed = queue_indexer.add_uuids(request.registry, req_uuids,
strict=strict, target_queue=target,
telemetry_id=telemetry_id)
response['requested_uuids'] = req_uuids
else:
# queue these as secondary
queued, failed = queue_indexer.add_collections(request.registry, req_collections, strict=strict,
queued, failed = queue_indexer.add_collections(request.registry,
req_collections,
strict=strict,
target_queue=target,
telemetry_id=telemetry_id)
response['requested_collections'] = req_collections
Expand Down Expand Up @@ -175,10 +179,10 @@ def __init__(self, registry, mirror_env=None):
self.second_queue_url = self.get_queue_url(self.second_queue_name)
self.dlq_url = self.get_queue_url(self.dlq_name)
# short names for queues
self.queue_targets = {
self.queue_targets = OrderedDict({
'primary': self.queue_url,
'secondary': self.second_queue_url
}
})

def add_uuids(self, registry, uuids, strict=False, target_queue='primary',
sid=None, telemetry_id=None):
Expand Down
19 changes: 9 additions & 10 deletions src/snovault/elasticsearch/mpindexer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from snovault import DBSESSION
from contextlib import contextmanager
from multiprocessing import get_context
from multiprocessing import (
get_context,
cpu_count
)
from multiprocessing.pool import Pool
from functools import partial
from pyramid.request import apply_request_extensions
Expand Down Expand Up @@ -30,12 +33,7 @@
def includeme(config):
if config.registry.settings.get('indexer_worker'):
return
processes = config.registry.settings.get('indexer.processes')
try:
processes = int(processes)
except:
processes = None
config.registry[INDEXER] = MPIndexer(config.registry, processes=processes)
config.registry[INDEXER] = MPIndexer(config.registry)


### Running in subprocess
Expand Down Expand Up @@ -144,10 +142,11 @@ def queue_error_callback(cb_args, counter, errors):
### Running in main process

class MPIndexer(Indexer):
def __init__(self, registry, processes=None):
def __init__(self, registry):
super(MPIndexer, self).__init__(registry)
self.chunksize = int(registry.settings.get('indexer.chunk_size', 1024))
self.processes = processes
num_cpu = cpu_count()
self.processes = num_cpu - 1 if num_cpu > 1 else num_cpu
self.initargs = (registry[APP_FACTORY], registry.settings,)
# workers in the pool will be replaced after finishing one task
self.maxtasks = 1
Expand All @@ -173,7 +172,7 @@ def update_objects(self, request, counter=None):
"""
pool = self.init_pool()
sync_uuids = request.json.get('uuids', None)
workers = pool._processes if self.processes is None else self.processes
workers = self.processes
# ensure workers != 0
workers = 1 if workers == 0 else workers
errors = []
Expand Down
Loading

0 comments on commit f2d173b

Please sign in to comment.