Skip to content

Commit

Permalink
Cleanup celery/app/__init__.py module
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Oct 31, 2016
1 parent 3e88ac9 commit d854e07
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 79 deletions.
61 changes: 60 additions & 1 deletion celery/_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
#: Global default app used when no current app.
default_app = None

#: Function returning the app provided or the default app if none.
#:
#: The environment variable :envvar:`CELERY_TRACE_APP` is used to
#: trace app leaks. When enabled an exception is raised if there
#: is no active app.
app_or_default = None

#: List of all app instances (weakrefs), mustn't be used directly.
_apps = weakref.WeakSet()

Expand Down Expand Up @@ -64,6 +71,16 @@ class _TLS(threading.local):
_task_stack = LocalStack()


#: Function used to push a task to the thread local stack
#: keeping track of the currently executing task.
#: You must remember to pop the task after.
push_current_task = _task_stack.push

#: Function used to pop a task from the thread local stack
#: keeping track of the currently executing task.
pop_current_task = _task_stack.pop


def set_default_app(app):
"""Set default app."""
global default_app
Expand All @@ -73,7 +90,7 @@ def set_default_app(app):
def _get_current_app():
if default_app is None:
#: creates the global fallback app instance.
from celery.app import Celery
from celery.app.base import Celery
set_default_app(Celery(
'default', fixups=[], set_as_current=False,
loader=os.environ.get('CELERY_LOADER') or 'default',
Expand Down Expand Up @@ -133,3 +150,45 @@ def _deregister_app(app):

def _get_active_apps():
return _apps


def _app_or_default(app=None):
if app is None:
return get_current_app()
return app


def _app_or_default_trace(app=None): # pragma: no cover
from traceback import print_stack
try:
from billiard.process import current_process
except ImportError:
current_process = None
if app is None:
if getattr(_tls, 'current_app', None):
print('-- RETURNING TO CURRENT APP --') # noqa+
print_stack()
return _tls.current_app
if not current_process or current_process()._name == 'MainProcess':
raise Exception('DEFAULT APP')
print('-- RETURNING TO DEFAULT APP --') # noqa+
print_stack()
return default_app
return app


def enable_trace():
"""Enable tracing of app instances."""
global app_or_default
app_or_default = _app_or_default_trace


def disable_trace():
"""Disable tracing of app instances."""
global app_or_default
app_or_default = _app_or_default

if os.environ.get('CELERY_TRACE_APP'): # pragma: no cover
enable_trace()
else:
disable_trace()
80 changes: 10 additions & 70 deletions celery/app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,88 +1,28 @@
# -*- coding: utf-8 -*-
"""Celery Application."""
from __future__ import absolute_import, print_function, unicode_literals
import os
from celery.local import Proxy
from celery import _state
from celery._state import (
get_current_app as current_app,
get_current_task as current_task,
connect_on_app_finalize, set_default_app, _get_active_apps, _task_stack,
app_or_default, enable_trace, disable_trace,
push_current_task, pop_current_task,
)
from .base import Celery, AppPickler
from .base import Celery
from .utils import AppPickler

__all__ = [
'Celery', 'AppPickler', 'default_app', 'app_or_default',
'Celery', 'AppPickler', 'app_or_default', 'default_app',
'bugreport', 'enable_trace', 'disable_trace', 'shared_task',
'set_default_app', 'current_app', 'current_task',
'push_current_task', 'pop_current_task',
]

#: Proxy always returning the app set as default.
default_app = Proxy(lambda: _state.default_app)

#: Function returning the app provided or the default app if none.
#:
#: The environment variable :envvar:`CELERY_TRACE_APP` is used to
#: trace app leaks. When enabled an exception is raised if there
#: is no active app.
app_or_default = None

#: Function used to push a task to the thread local stack
#: keeping track of the currently executing task.
#: You must remember to pop the task after.
push_current_task = _task_stack.push

#: Function used to pop a task from the thread local stack
#: keeping track of the currently executing task.
pop_current_task = _task_stack.pop


def bugreport(app=None):
"""Return information useful in bug reports."""
return (app or current_app()).bugreport()


def _app_or_default(app=None):
if app is None:
return _state.get_current_app()
return app


def _app_or_default_trace(app=None): # pragma: no cover
from traceback import print_stack
try:
from billiard.process import current_process
except ImportError:
current_process = None
if app is None:
if getattr(_state._tls, 'current_app', None):
print('-- RETURNING TO CURRENT APP --') # noqa+
print_stack()
return _state._tls.current_app
if not current_process or current_process()._name == 'MainProcess':
raise Exception('DEFAULT APP')
print('-- RETURNING TO DEFAULT APP --') # noqa+
print_stack()
return _state.default_app
return app


def enable_trace():
"""Enable tracing of app instances."""
global app_or_default
app_or_default = _app_or_default_trace


def disable_trace():
"""Disable tracing of app instances."""
global app_or_default
app_or_default = _app_or_default

if os.environ.get('CELERY_TRACE_APP'): # pragma: no cover
enable_trace()
else:
disable_trace()
return (app or _state.get_current_app()).bugreport()


def shared_task(*args, **kwargs):
Expand Down Expand Up @@ -114,21 +54,21 @@ def create_shared_task(**options):
def __inner(fun):
name = options.get('name')
# Set as shared task so that unfinalized apps,
# and future apps will load the task.
connect_on_app_finalize(
# and future apps will register a copy of this task.
_state.connect_on_app_finalize(
lambda app: app._task_from_fun(fun, **options)
)

# Force all finalized apps to take this task as well.
for app in _get_active_apps():
for app in _state._get_active_apps():
if app.finalized:
with app._finalize_mutex:
app._task_from_fun(fun, **options)

# Return a proxy that always gets the task from the current
# apps task registry.
def task_by_cons():
app = current_app()
app = _state.get_current_app()
return app.tasks[
name or app.gen_task_name(fun.__name__, fun.__module__)
]
Expand Down
3 changes: 1 addition & 2 deletions celery/app/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from celery import current_app, group
from celery import states, signals
from celery._state import _task_stack
from celery.app import set_default_app
from celery.app.task import Task as BaseTask, Context
from celery.exceptions import Ignore, Reject, Retry, InvalidTaskError
from celery.five import monotonic, text_t
Expand Down Expand Up @@ -563,7 +562,7 @@ def setup_worker_optimizations(app, hostname=None):
# and means that only a single app can be used for workers
# running in the same process.
app.set_current()
set_default_app(app)
app.set_default()

# evaluate all task classes by finalizing the app.
app.finalize()
Expand Down
4 changes: 2 additions & 2 deletions celery/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from celery import states
from celery import current_app, group, maybe_signature
from celery.app import current_task
from celery._state import get_current_task
from celery.exceptions import (
ChordError, TimeoutError, TaskRevokedError, ImproperlyConfigured,
)
Expand Down Expand Up @@ -425,7 +425,7 @@ def apply_chord(self, header, partial_args, group_id, body,
return result

def current_task_children(self, request=None):
request = request or getattr(current_task(), 'request', None)
request = request or getattr(get_current_task(), 'request', None)
if request:
return [r.as_tuple() for r in getattr(request, 'children', [])]

Expand Down
3 changes: 1 addition & 2 deletions celery/backends/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
from kombu.utils.compat import register_after_fork
from kombu.utils.objects import cached_property

from celery import current_task
from celery import states
from celery._state import task_join_will_block
from celery._state import current_task, task_join_will_block
from celery.five import items, range

from . import base
Expand Down
4 changes: 2 additions & 2 deletions t/unit/app/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -916,9 +916,9 @@ class test_debugging_utils:
def test_enable_disable_trace(self):
try:
_app.enable_trace()
assert _app.app_or_default == _app._app_or_default_trace
assert _state.app_or_default == _state._app_or_default_trace
_app.disable_trace()
assert _app.app_or_default == _app._app_or_default
assert _state.app_or_default == _state._app_or_default
finally:
_app.disable_trace()

Expand Down

0 comments on commit d854e07

Please sign in to comment.