From c3a37cdc129c96d78f1c4285e1918c6684c3d682 Mon Sep 17 00:00:00 2001 From: Jason Parham Date: Tue, 13 Oct 2020 15:36:43 -0700 Subject: [PATCH] Print exceptions in worker --- wbia/web/job_engine.py | 142 ++++++++++++++++++++++------------------- 1 file changed, 77 insertions(+), 65 deletions(-) diff --git a/wbia/web/job_engine.py b/wbia/web/job_engine.py index 571a2d31b0..c84f84f209 100644 --- a/wbia/web/job_engine.py +++ b/wbia/web/job_engine.py @@ -87,7 +87,7 @@ # VERBOSE_JOBS = ( # ut.get_argflag('--bg') or ut.get_argflag('--fg') or ut.get_argflag('--verbose-jobs') # ) -VERBOSE_JOBS = True +VERBOSE_JOBS = False GLOBAL_SHELVE_LOCK = multiprocessing.Lock() @@ -1569,76 +1569,88 @@ def engine_loop(id_, port_dict, dbdir, containerized, lane): try: while True: - idents, engine_request = rcv_multipart_json(engine_send_sock, print=print) - - action = engine_request['action'] - jobid = engine_request['jobid'] - args = engine_request['args'] - kwargs = engine_request['kwargs'] - callback_url = engine_request['callback_url'] - callback_method = engine_request['callback_method'] - lane_ = engine_request['lane'] - - if VERBOSE_JOBS: - print('\tjobid = %r' % (jobid,)) - print('\taction = %r' % (action,)) - print('\targs = %r' % (args,)) - print('\tkwargs = %r' % (kwargs,)) - print('\tlane = %r' % (lane,)) - print('\tlane_ = %r' % (lane_,)) - - # Notify start working - reply_notify = { - # 'idents': idents, - 'jobid': jobid, - 'status': 'working', - 'action': 'notification', - } - collect_recieve_socket.send_json(reply_notify) + try: + idents, engine_request = rcv_multipart_json( + engine_send_sock, print=print + ) - engine_result = on_engine_request(ibs, jobid, action, args, kwargs) - exec_status = engine_result['exec_status'] + action = engine_request['action'] + jobid = engine_request['jobid'] + args = engine_request['args'] + kwargs = engine_request['kwargs'] + callback_url = engine_request['callback_url'] + callback_method = engine_request['callback_method'] + lane_ = engine_request['lane'] - # Notify start working - reply_notify = { - # 'idents': idents, - 'jobid': jobid, - 'status': 'publishing', - 'action': 'notification', - } - collect_recieve_socket.send_json(reply_notify) - - # Store results in the collector - collect_request = { - # 'idents': idents, - 'action': 'store', - 'jobid': jobid, - 'engine_result': engine_result, - 'callback_url': callback_url, - 'callback_method': callback_method, - } - # if VERBOSE_JOBS: - print( - '...done working. pushing result to collector for jobid %s' % (jobid,) - ) + if VERBOSE_JOBS: + print('\tjobid = %r' % (jobid,)) + print('\taction = %r' % (action,)) + print('\targs = %r' % (args,)) + print('\tkwargs = %r' % (kwargs,)) + print('\tlane = %r' % (lane,)) + print('\tlane_ = %r' % (lane_,)) + + # Notify start working + reply_notify = { + # 'idents': idents, + 'jobid': jobid, + 'status': 'working', + 'action': 'notification', + } + collect_recieve_socket.send_json(reply_notify) - # CALLS: collector_store - collect_recieve_socket.send_json(collect_request) + engine_result = on_engine_request(ibs, jobid, action, args, kwargs) + exec_status = engine_result['exec_status'] - # Notify start working - reply_notify = { - # 'idents': idents, - 'jobid': jobid, - 'status': exec_status, - 'action': 'notification', - } - collect_recieve_socket.send_json(reply_notify) + # Notify start working + reply_notify = { + # 'idents': idents, + 'jobid': jobid, + 'status': 'publishing', + 'action': 'notification', + } + collect_recieve_socket.send_json(reply_notify) - # We no longer need the engine result, and can clear it's memory - engine_request = None - engine_result = None - collect_request = None + # Store results in the collector + collect_request = { + # 'idents': idents, + 'action': 'store', + 'jobid': jobid, + 'engine_result': engine_result, + 'callback_url': callback_url, + 'callback_method': callback_method, + } + # if VERBOSE_JOBS: + print( + '...done working. pushing result to collector for jobid %s' + % (jobid,) + ) + # CALLS: collector_store + collect_recieve_socket.send_json(collect_request) + + # Notify start working + reply_notify = { + # 'idents': idents, + 'jobid': jobid, + 'status': exec_status, + 'action': 'notification', + } + collect_recieve_socket.send_json(reply_notify) + + # We no longer need the engine result, and can clear it's memory + engine_request = None + engine_result = None + collect_request = None + except KeyboardInterrupt: + raise + except Exception as ex: + result = ut.formatex(ex, keys=['jobid'], tb=True) + result = ut.strip_ansi(result) + print_ = partial(ut.colorprint, color='brightred') + with ut.Indenter('[job engine worker error] '): + print_(result) + raise except KeyboardInterrupt: print('Caught ctrl+c in engine loop. Gracefully exiting')