Skip to content

Commit

Permalink
Merge pull request #795 from DataDog/0.20.1-dev
Browse files Browse the repository at this point in the history
0.20.1
  • Loading branch information
brettlangdon authored Jan 16, 2019
2 parents 05d8402 + 02149d0 commit c509eac
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 20 deletions.
2 changes: 1 addition & 1 deletion ddtrace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .tracer import Tracer
from .settings import config

__version__ = '0.20.0'
__version__ = '0.20.1'

# a global tracer instance with integration settings
tracer = Tracer()
Expand Down
6 changes: 3 additions & 3 deletions ddtrace/contrib/celery/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def trace_before_publish(*args, **kwargs):
# Note: adding tags from `traceback` or `state` calls will make an
# API call to the backend for the properties so we should rely
# only on the given `Context`
attach_span(task, task_id, span)
attach_span(task, task_id, span, is_publish=True)


def trace_after_publish(*args, **kwargs):
Expand All @@ -102,12 +102,12 @@ def trace_after_publish(*args, **kwargs):
return

# retrieve and finish the Span
span = retrieve_span(task, task_id)
span = retrieve_span(task, task_id, is_publish=True)
if span is None:
return
else:
span.finish()
detach_span(task, task_id)
detach_span(task, task_id, is_publish=True)


def trace_failure(*args, **kwargs):
Expand Down
29 changes: 22 additions & 7 deletions ddtrace/contrib/celery/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,40 +39,55 @@ def tags_from_context(context):
return tags


def attach_span(task, task_id, span):
def attach_span(task, task_id, span, is_publish=False):
"""Helper to propagate a `Span` for the given `Task` instance. This
function uses a `WeakValueDictionary` that stores a Datadog Span using
the `task_id` as a key. This is useful when information must be
the `(task_id, is_publish)` as a key. This is useful when information must be
propagated from one Celery signal to another.
DEV: We use (task_id, is_publish) for the key to ensure that publishing a
task from within another task does not cause any conflicts.
This mostly happens when either a task fails and a retry policy is in place,
or when a task is manually retries (e.g. `task.retry()`), we end up trying
to publish a task with the same id as the task currently running.
Previously publishing the new task would overwrite the existing `celery.run` span
in the `weak_dict` causing that span to be forgotten and never finished.
NOTE: We cannot test for this well yet, because we do not run a celery worker,
and cannot run `task.apply_async()`
"""
weak_dict = getattr(task, CTX_KEY, None)
if weak_dict is None:
weak_dict = WeakValueDictionary()
setattr(task, CTX_KEY, weak_dict)

weak_dict[task_id] = span
weak_dict[(task_id, is_publish)] = span


def detach_span(task, task_id):
def detach_span(task, task_id, is_publish=False):
"""Helper to remove a `Span` in a Celery task when it's propagated.
This function handles tasks where the `Span` is not attached.
"""
weak_dict = getattr(task, CTX_KEY, None)
if weak_dict is None:
return

weak_dict.pop(task_id, None)
# DEV: See note in `attach_span` for key info
weak_dict.pop((task_id, is_publish), None)


def retrieve_span(task, task_id):
def retrieve_span(task, task_id, is_publish=False):
"""Helper to retrieve an active `Span` stored in a `Task`
instance
"""
weak_dict = getattr(task, CTX_KEY, None)
if weak_dict is None:
return
else:
return weak_dict.get(task_id)
# DEV: See note in `attach_span` for key info
return weak_dict.get((task_id, is_publish))


def retrieve_task_id(context):
Expand Down
11 changes: 6 additions & 5 deletions tests/contrib/celery/base.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,30 @@
import unittest

from celery import Celery

from ddtrace import Pin, config
from ddtrace.compat import PY2
from ddtrace.contrib.celery import patch, unpatch

from ..config import REDIS_CONFIG
from ...test_tracer import get_dummy_tracer
from ...base import BaseTracerTestCase


REDIS_URL = 'redis://127.0.0.1:{port}'.format(port=REDIS_CONFIG['port'])
BROKER_URL = '{redis}/{db}'.format(redis=REDIS_URL, db=0)
BACKEND_URL = '{redis}/{db}'.format(redis=REDIS_URL, db=1)


class CeleryBaseTestCase(unittest.TestCase):
class CeleryBaseTestCase(BaseTracerTestCase):
"""Test case that handles a full fledged Celery application with a
custom tracer. It patches the new Celery application.
"""

def setUp(self):
super(CeleryBaseTestCase, self).setUp()

# keep track of original config
self._config = dict(config.celery)
# instrument Celery and create an app with Broker and Result backends
patch()
self.tracer = get_dummy_tracer()
self.pin = Pin(service='celery-unittest', tracer=self.tracer)
self.app = Celery('celery.test_app', broker=BROKER_URL, backend=BACKEND_URL)
# override pins to use our Dummy Tracer
Expand All @@ -39,6 +38,8 @@ def tearDown(self):
config.celery.update(self._config)
self._config = None

super(CeleryBaseTestCase, self).tearDown()

def assert_items_equal(self, a, b):
if PY2:
return self.assertItemsEqual(a, b)
Expand Down
9 changes: 5 additions & 4 deletions tests/contrib/celery/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def fn_task():
# delete the Span
weak_dict = getattr(fn_task, '__dd_task_span')
detach_span(fn_task, task_id)
ok_(weak_dict.get(task_id) is None)
ok_(weak_dict.get((task_id, False)) is None)

def test_span_delete_empty(self):
# ensure the helper works even if the Task doesn't have
Expand Down Expand Up @@ -119,13 +119,14 @@ def fn_task():
task_id = '7c6731af-9533-40c3-83a9-25b58f0d837f'
attach_span(fn_task, task_id, self.tracer.trace('celery.run'))
weak_dict = getattr(fn_task, '__dd_task_span')
ok_(weak_dict.get(task_id))
key = (task_id, False)
ok_(weak_dict.get(key))
# flush data and force the GC
weak_dict.get(task_id).finish()
weak_dict.get(key).finish()
self.tracer.writer.pop()
self.tracer.writer.pop_traces()
gc.collect()
ok_(weak_dict.get(task_id) is None)
ok_(weak_dict.get(key) is None)

def test_task_id_from_protocol_v1(self):
# ensures a `task_id` is properly returned when Protocol v1 is used.
Expand Down

0 comments on commit c509eac

Please sign in to comment.