Skip to content

Commit

Permalink
Merge pull request #809 from DataDog/0.20.3-dev
Browse files Browse the repository at this point in the history
Release 0.20.3
  • Loading branch information
brettlangdon authored Feb 4, 2019
2 parents 65864ce + 3be17a4 commit c8b1f40
Show file tree
Hide file tree
Showing 10 changed files with 376 additions and 116 deletions.
2 changes: 1 addition & 1 deletion ddtrace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .tracer import Tracer
from .settings import config

__version__ = '0.20.2'
__version__ = '0.20.3'

# a global tracer instance with integration settings
tracer = Tracer()
Expand Down
10 changes: 10 additions & 0 deletions ddtrace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,16 @@ class ThreadLocalContext(object):
def __init__(self):
self._locals = threading.local()

def _has_active_context(self):
"""
Determine whether we have a currently active context for this thread
:returns: Whether an active context exists
:rtype: bool
"""
ctx = getattr(self._locals, 'context', None)
return ctx is not None

def set(self, ctx):
setattr(self._locals, 'context', ctx)

Expand Down
43 changes: 30 additions & 13 deletions ddtrace/contrib/asyncio/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,8 @@ class AsyncioContextProvider(DefaultContextProvider):
def activate(self, context, loop=None):
"""Sets the scoped ``Context`` for the current running ``Task``.
"""
try:
loop = loop or asyncio.get_event_loop()
except RuntimeError:
# detects if a loop is available in the current thread;
# This happens when a new thread is created from the one that is running
# the async loop
loop = self._get_loop(loop)
if not loop:
self._local.set(context)
return context

Expand All @@ -35,19 +31,40 @@ def activate(self, context, loop=None):
setattr(task, CONTEXT_ATTR, context)
return context

def _get_loop(self, loop=None):
"""Helper to try and resolve the current loop"""
try:
return loop or asyncio.get_event_loop()
except RuntimeError:
# Detects if a loop is available in the current thread;
# DEV: This happens when a new thread is created from the out that is running the async loop
# DEV: It's possible that a different Executor is handling a different Thread that
# works with blocking code. In that case, we fallback to a thread-local Context.
pass
return None

def _has_active_context(self, loop=None):
"""Helper to determine if we have a currently active context"""
loop = self._get_loop(loop=loop)
if loop is None:
return self._local._has_active_context()

# the current unit of work (if tasks are used)
task = asyncio.Task.current_task(loop=loop)
if task is None:
return False

ctx = getattr(task, CONTEXT_ATTR, None)
return ctx is not None

def active(self, loop=None):
"""
Returns the scoped Context for this execution flow. The ``Context`` uses
the current task as a carrier so if a single task is used for the entire application,
the context must be handled separately.
"""
try:
loop = loop or asyncio.get_event_loop()
except RuntimeError:
# handles RuntimeError: There is no current event loop in thread 'MainThread'
# it happens when it's not possible to get the current event loop.
# It's possible that a different Executor is handling a different Thread that
# works with blocking code. In that case, we fallback to a thread-local Context.
loop = self._get_loop(loop=loop)
if not loop:
return self._local.get()

# the current unit of work (if tasks are used)
Expand Down
23 changes: 20 additions & 3 deletions ddtrace/contrib/futures/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,24 @@ def _wrap_submit(func, instance, args, kwargs):
thread. This wrapper ensures that a new `Context` is created and
properly propagated using an intermediate function.
"""
# propagate the same Context in the new thread
current_ctx = ddtrace.tracer.context_provider.active()
# If there isn't a currently active context, then do not create one
# DEV: Calling `.active()` when there isn't an active context will create a new context
# DEV: We need to do this in case they are either:
# - Starting nested futures
# - Starting futures from outside of an existing context
#
# In either of these cases we essentially will propagate the wrong context between futures
#
# The resolution is to not create/propagate a new context if one does not exist, but let the
# future's thread create the context instead.
current_ctx = None
if ddtrace.tracer.context_provider._has_active_context():
current_ctx = ddtrace.tracer.context_provider.active()

# If we have a context then make sure we clone it
# DEV: We don't know if the future will finish executing before the parent span finishes
# so we clone to ensure we properly collect/report the future's spans
current_ctx = current_ctx.clone()

# extract the target function that must be executed in
# a new thread and the `target` arguments
Expand All @@ -25,5 +41,6 @@ def _wrap_execution(ctx, fn, args, kwargs):
provider sets the Active context in a thread local storage
variable because it's outside the asynchronous loop.
"""
ddtrace.tracer.context_provider.activate(ctx)
if ctx is not None:
ddtrace.tracer.context_provider.activate(ctx)
return fn(*args, **kwargs)
15 changes: 13 additions & 2 deletions ddtrace/contrib/gevent/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ class GeventContextProvider(BaseContextProvider):
in the ``gevent`` library. Framework instrumentation that uses the
gevent WSGI server (or gevent in general), can use this provider.
"""
def _get_current_context(self):
"""Helper to get the current context from the current greenlet"""
current_g = gevent.getcurrent()
if current_g is not None:
return getattr(current_g, CONTEXT_ATTR, None)
return None

def _has_active_context(self):
"""Helper to determine if we have a currently active context"""
return self._get_current_context() is not None

def activate(self, context):
"""Sets the scoped ``Context`` for the current running ``Greenlet``.
"""
Expand All @@ -29,15 +40,15 @@ def active(self):
uses the ``Greenlet`` class as a carrier, and everytime a greenlet
is created it receives the "parent" context.
"""
current_g = gevent.getcurrent()
ctx = getattr(current_g, CONTEXT_ATTR, None)
ctx = self._get_current_context()
if ctx is not None:
# return the active Context for this greenlet (if any)
return ctx

# the Greenlet doesn't have a Context so it's created and attached
# even to the main greenlet. This is required in Distributed Tracing
# when a new arbitrary Context is provided.
current_g = gevent.getcurrent()
if current_g:
ctx = Context()
setattr(current_g, CONTEXT_ATTR, ctx)
Expand Down
30 changes: 23 additions & 7 deletions ddtrace/contrib/tornado/stack_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,41 @@ def __exit__(self, type, value, traceback):
def deactivate(self):
self._active = False

def _has_io_loop(self):
"""Helper to determine if we are currently in an IO loop"""
return getattr(IOLoop._current, 'instance', None) is not None

def _has_active_context(self):
"""Helper to determine if we have an active context or not"""
if not self._has_io_loop():
return self._local._has_active_context()
else:
# we're inside a Tornado loop so the TracerStackContext is used
return self._get_state_active_context() is not None

def _get_state_active_context(self):
"""Helper to get the currently active context from the TracerStackContext"""
# we're inside a Tornado loop so the TracerStackContext is used
for stack in reversed(_state.contexts[0]):
if isinstance(stack, self.__class__) and stack._active:
return stack._context
return None

def active(self):
"""
Return the ``Context`` from the current execution flow. This method can be
used inside a Tornado coroutine to retrieve and use the current tracing context.
If used in a separated Thread, the `_state` thread-local storage is used to
propagate the current Active context from the `MainThread`.
"""
io_loop = getattr(IOLoop._current, 'instance', None)
if io_loop is None:
if not self._has_io_loop():
# if a Tornado loop is not available, it means that this method
# has been called from a synchronous code, so we can rely in a
# thread-local storage
return self._local.get()
else:
# we're inside a Tornado loop so the TracerStackContext is used
for stack in reversed(_state.contexts[0]):
if isinstance(stack, self.__class__) and stack._active:
return stack._context
return self._get_state_active_context()

def activate(self, ctx):
"""
Expand All @@ -83,8 +100,7 @@ def activate(self, ctx):
If used in a separated Thread, the `_state` thread-local storage is used to
propagate the current Active context from the `MainThread`.
"""
io_loop = getattr(IOLoop._current, 'instance', None)
if io_loop is None:
if not self._has_io_loop():
# because we're outside of an asynchronous execution, we store
# the current context in a thread-local storage
self._local.set(ctx)
Expand Down
12 changes: 12 additions & 0 deletions ddtrace/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ class BaseContextProvider(object):
* the ``active`` method, that returns the current active ``Context``
* the ``activate`` method, that sets the current active ``Context``
"""
def _has_active_context(self):
raise NotImplementedError

def activate(self, context):
raise NotImplementedError

Expand All @@ -32,6 +35,15 @@ class DefaultContextProvider(BaseContextProvider):
def __init__(self):
self._local = ThreadLocalContext()

def _has_active_context(self):
"""
Check whether we have a currently active context.
:returns: Whether we have an active context
:rtype: bool
"""
return self._local._has_active_context()

def activate(self, context):
"""Makes the given ``context`` active, so that the provider calls
the thread-local storage implementation.
Expand Down
14 changes: 12 additions & 2 deletions tests/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import contextlib
import unittest

from ddtrace import config
import ddtrace

from ..utils.tracer import DummyTracer
from ..utils.span import TestSpanContainer, TestSpan, NO_CHILDREN
Expand Down Expand Up @@ -30,7 +30,7 @@ def override_config(self, integration, values):
>>> with self.override_config('flask', dict(service_name='test-service')):
# Your test
"""
options = getattr(config, integration)
options = getattr(ddtrace.config, integration)

original = dict(
(key, options.get(key))
Expand Down Expand Up @@ -81,3 +81,13 @@ def assert_structure(self, root, children=NO_CHILDREN):
"""Helper to call TestSpanNode.assert_structure on the current root span"""
root_span = self.get_root_span()
root_span.assert_structure(root, children)

@contextlib.contextmanager
def override_global_tracer(self, tracer=None):
original = ddtrace.tracer
tracer = tracer or self.tracer
setattr(ddtrace, 'tracer', tracer)
try:
yield
finally:
setattr(ddtrace, 'tracer', original)
Loading

0 comments on commit c8b1f40

Please sign in to comment.