From d854e071bf422f148ca22164dccc23b3121a72a8 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Mon, 31 Oct 2016 16:11:57 -0700 Subject: [PATCH] Cleanup celery/app/__init__.py module --- celery/_state.py | 61 ++++++++++++++++++++++++++++++- celery/app/__init__.py | 80 ++++++----------------------------------- celery/app/trace.py | 3 +- celery/backends/base.py | 4 +-- celery/backends/rpc.py | 3 +- t/unit/app/test_app.py | 4 +-- 6 files changed, 76 insertions(+), 79 deletions(-) diff --git a/celery/_state.py b/celery/_state.py index ecf6b919aaa..9b1e2350afb 100644 --- a/celery/_state.py +++ b/celery/_state.py @@ -23,6 +23,13 @@ #: Global default app used when no current app. default_app = None +#: Function returning the app provided or the default app if none. +#: +#: The environment variable :envvar:`CELERY_TRACE_APP` is used to +#: trace app leaks. When enabled an exception is raised if there +#: is no active app. +app_or_default = None + #: List of all app instances (weakrefs), mustn't be used directly. _apps = weakref.WeakSet() @@ -64,6 +71,16 @@ class _TLS(threading.local): _task_stack = LocalStack() +#: Function used to push a task to the thread local stack +#: keeping track of the currently executing task. +#: You must remember to pop the task after. +push_current_task = _task_stack.push + +#: Function used to pop a task from the thread local stack +#: keeping track of the currently executing task. +pop_current_task = _task_stack.pop + + def set_default_app(app): """Set default app.""" global default_app @@ -73,7 +90,7 @@ def set_default_app(app): def _get_current_app(): if default_app is None: #: creates the global fallback app instance. - from celery.app import Celery + from celery.app.base import Celery set_default_app(Celery( 'default', fixups=[], set_as_current=False, loader=os.environ.get('CELERY_LOADER') or 'default', @@ -133,3 +150,45 @@ def _deregister_app(app): def _get_active_apps(): return _apps + + +def _app_or_default(app=None): + if app is None: + return get_current_app() + return app + + +def _app_or_default_trace(app=None): # pragma: no cover + from traceback import print_stack + try: + from billiard.process import current_process + except ImportError: + current_process = None + if app is None: + if getattr(_tls, 'current_app', None): + print('-- RETURNING TO CURRENT APP --') # noqa+ + print_stack() + return _tls.current_app + if not current_process or current_process()._name == 'MainProcess': + raise Exception('DEFAULT APP') + print('-- RETURNING TO DEFAULT APP --') # noqa+ + print_stack() + return default_app + return app + + +def enable_trace(): + """Enable tracing of app instances.""" + global app_or_default + app_or_default = _app_or_default_trace + + +def disable_trace(): + """Disable tracing of app instances.""" + global app_or_default + app_or_default = _app_or_default + +if os.environ.get('CELERY_TRACE_APP'): # pragma: no cover + enable_trace() +else: + disable_trace() diff --git a/celery/app/__init__.py b/celery/app/__init__.py index 9437444b9c4..051824ddd49 100644 --- a/celery/app/__init__.py +++ b/celery/app/__init__.py @@ -1,88 +1,28 @@ # -*- coding: utf-8 -*- """Celery Application.""" from __future__ import absolute_import, print_function, unicode_literals -import os from celery.local import Proxy from celery import _state from celery._state import ( - get_current_app as current_app, - get_current_task as current_task, - connect_on_app_finalize, set_default_app, _get_active_apps, _task_stack, + app_or_default, enable_trace, disable_trace, + push_current_task, pop_current_task, ) -from .base import Celery, AppPickler +from .base import Celery +from .utils import AppPickler __all__ = [ - 'Celery', 'AppPickler', 'default_app', 'app_or_default', + 'Celery', 'AppPickler', 'app_or_default', 'default_app', 'bugreport', 'enable_trace', 'disable_trace', 'shared_task', - 'set_default_app', 'current_app', 'current_task', 'push_current_task', 'pop_current_task', ] #: Proxy always returning the app set as default. default_app = Proxy(lambda: _state.default_app) -#: Function returning the app provided or the default app if none. -#: -#: The environment variable :envvar:`CELERY_TRACE_APP` is used to -#: trace app leaks. When enabled an exception is raised if there -#: is no active app. -app_or_default = None - -#: Function used to push a task to the thread local stack -#: keeping track of the currently executing task. -#: You must remember to pop the task after. -push_current_task = _task_stack.push - -#: Function used to pop a task from the thread local stack -#: keeping track of the currently executing task. -pop_current_task = _task_stack.pop - def bugreport(app=None): """Return information useful in bug reports.""" - return (app or current_app()).bugreport() - - -def _app_or_default(app=None): - if app is None: - return _state.get_current_app() - return app - - -def _app_or_default_trace(app=None): # pragma: no cover - from traceback import print_stack - try: - from billiard.process import current_process - except ImportError: - current_process = None - if app is None: - if getattr(_state._tls, 'current_app', None): - print('-- RETURNING TO CURRENT APP --') # noqa+ - print_stack() - return _state._tls.current_app - if not current_process or current_process()._name == 'MainProcess': - raise Exception('DEFAULT APP') - print('-- RETURNING TO DEFAULT APP --') # noqa+ - print_stack() - return _state.default_app - return app - - -def enable_trace(): - """Enable tracing of app instances.""" - global app_or_default - app_or_default = _app_or_default_trace - - -def disable_trace(): - """Disable tracing of app instances.""" - global app_or_default - app_or_default = _app_or_default - -if os.environ.get('CELERY_TRACE_APP'): # pragma: no cover - enable_trace() -else: - disable_trace() + return (app or _state.get_current_app()).bugreport() def shared_task(*args, **kwargs): @@ -114,13 +54,13 @@ def create_shared_task(**options): def __inner(fun): name = options.get('name') # Set as shared task so that unfinalized apps, - # and future apps will load the task. - connect_on_app_finalize( + # and future apps will register a copy of this task. + _state.connect_on_app_finalize( lambda app: app._task_from_fun(fun, **options) ) # Force all finalized apps to take this task as well. - for app in _get_active_apps(): + for app in _state._get_active_apps(): if app.finalized: with app._finalize_mutex: app._task_from_fun(fun, **options) @@ -128,7 +68,7 @@ def __inner(fun): # Return a proxy that always gets the task from the current # apps task registry. def task_by_cons(): - app = current_app() + app = _state.get_current_app() return app.tasks[ name or app.gen_task_name(fun.__name__, fun.__module__) ] diff --git a/celery/app/trace.py b/celery/app/trace.py index 79bc4a6cf21..23f98a5770a 100644 --- a/celery/app/trace.py +++ b/celery/app/trace.py @@ -32,7 +32,6 @@ from celery import current_app, group from celery import states, signals from celery._state import _task_stack -from celery.app import set_default_app from celery.app.task import Task as BaseTask, Context from celery.exceptions import Ignore, Reject, Retry, InvalidTaskError from celery.five import monotonic, text_t @@ -563,7 +562,7 @@ def setup_worker_optimizations(app, hostname=None): # and means that only a single app can be used for workers # running in the same process. app.set_current() - set_default_app(app) + app.set_default() # evaluate all task classes by finalizing the app. app.finalize() diff --git a/celery/backends/base.py b/celery/backends/base.py index 594f37a194f..3c10e5f1e62 100644 --- a/celery/backends/base.py +++ b/celery/backends/base.py @@ -25,7 +25,7 @@ from celery import states from celery import current_app, group, maybe_signature -from celery.app import current_task +from celery._state import get_current_task from celery.exceptions import ( ChordError, TimeoutError, TaskRevokedError, ImproperlyConfigured, ) @@ -425,7 +425,7 @@ def apply_chord(self, header, partial_args, group_id, body, return result def current_task_children(self, request=None): - request = request or getattr(current_task(), 'request', None) + request = request or getattr(get_current_task(), 'request', None) if request: return [r.as_tuple() for r in getattr(request, 'children', [])] diff --git a/celery/backends/rpc.py b/celery/backends/rpc.py index 2602688bdd0..844c2cad755 100644 --- a/celery/backends/rpc.py +++ b/celery/backends/rpc.py @@ -12,9 +12,8 @@ from kombu.utils.compat import register_after_fork from kombu.utils.objects import cached_property -from celery import current_task from celery import states -from celery._state import task_join_will_block +from celery._state import current_task, task_join_will_block from celery.five import items, range from . import base diff --git a/t/unit/app/test_app.py b/t/unit/app/test_app.py index 21615f85b94..528dd5f30eb 100644 --- a/t/unit/app/test_app.py +++ b/t/unit/app/test_app.py @@ -916,9 +916,9 @@ class test_debugging_utils: def test_enable_disable_trace(self): try: _app.enable_trace() - assert _app.app_or_default == _app._app_or_default_trace + assert _state.app_or_default == _state._app_or_default_trace _app.disable_trace() - assert _app.app_or_default == _app._app_or_default + assert _state.app_or_default == _state._app_or_default finally: _app.disable_trace()