From 14003a8964c5d62374732d135a225e480afd083a Mon Sep 17 00:00:00 2001 From: kyle-verhoog Date: Mon, 26 Nov 2018 17:37:16 +0100 Subject: [PATCH 01/11] [core] make writing services a no-op --- ddtrace/writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/writer.py b/ddtrace/writer.py index f6e13c6a317..6920a9bd7c7 100644 --- a/ddtrace/writer.py +++ b/ddtrace/writer.py @@ -41,7 +41,7 @@ def write(self, spans=None, services=None): self._traces.add(spans) if services: - self._services.add(services) + pass def _reset_worker(self): # if this queue was created in a different process (i.e. this was From 5f9edf9f6b3a8f970fc8c856fb0771919597de4a Mon Sep 17 00:00:00 2001 From: kyle-verhoog Date: Mon, 26 Nov 2018 17:58:13 +0100 Subject: [PATCH 02/11] [core] remove set_service_info logic --- ddtrace/tracer.py | 26 ++++---------------------- ddtrace/writer.py | 3 --- 2 files changed, 4 insertions(+), 25 deletions(-) diff --git a/ddtrace/tracer.py b/ddtrace/tracer.py index 05e9ce86354..32677295346 100644 --- a/ddtrace/tracer.py +++ b/ddtrace/tracer.py @@ -336,32 +336,14 @@ def write(self, spans): def set_service_info(self, service, app, app_type): """Set the information about the given service. + Note: this method no longer performs any sending of service info to the + agent. + :param str service: the internal name of the service (e.g. acme_search, datadog_web) :param str app: the off the shelf name of the application (e.g. rails, postgres, custom-app) :param str app_type: the type of the application (e.g. db, web) """ - try: - # don't bother sending the same services over and over. - info = (service, app, app_type) - if self._services.get(service, None) == info: - return - self._services[service] = info - - if self.debug_logging: - log.debug("set_service_info: service:%s app:%s type:%s", service, app, app_type) - - # If we had changes, send them to the writer. - if self.enabled and self.writer: - - # translate to the form the server understands. - services = {} - for service, app, app_type in self._services.values(): - services[service] = {"app" : app, "app_type" : app_type} - - # queue them for writes. - self.writer.write(services=services) - except Exception: - log.debug("error setting service info", exc_info=True) + pass def wrap(self, name=None, service=None, resource=None, span_type=None): """ diff --git a/ddtrace/writer.py b/ddtrace/writer.py index 6920a9bd7c7..a248a466af2 100644 --- a/ddtrace/writer.py +++ b/ddtrace/writer.py @@ -40,9 +40,6 @@ def write(self, spans=None, services=None): if spans: self._traces.add(spans) - if services: - pass - def _reset_worker(self): # if this queue was created in a different process (i.e. this was # forked) reset everything so that we can safely work from it. From e020add415af6c0c5a84b308fd480dbb1a09b816 Mon Sep 17 00:00:00 2001 From: kyle-verhoog Date: Mon, 26 Nov 2018 19:41:19 +0100 Subject: [PATCH 03/11] [core] replace set_service_info, remove integration test --- ddtrace/tracer.py | 26 ++++++++++++++++++++++---- tests/test_integration.py | 32 -------------------------------- 2 files changed, 22 insertions(+), 36 deletions(-) diff --git a/ddtrace/tracer.py b/ddtrace/tracer.py index 32677295346..05e9ce86354 100644 --- a/ddtrace/tracer.py +++ b/ddtrace/tracer.py @@ -336,14 +336,32 @@ def write(self, spans): def set_service_info(self, service, app, app_type): """Set the information about the given service. - Note: this method no longer performs any sending of service info to the - agent. - :param str service: the internal name of the service (e.g. acme_search, datadog_web) :param str app: the off the shelf name of the application (e.g. rails, postgres, custom-app) :param str app_type: the type of the application (e.g. db, web) """ - pass + try: + # don't bother sending the same services over and over. + info = (service, app, app_type) + if self._services.get(service, None) == info: + return + self._services[service] = info + + if self.debug_logging: + log.debug("set_service_info: service:%s app:%s type:%s", service, app, app_type) + + # If we had changes, send them to the writer. + if self.enabled and self.writer: + + # translate to the form the server understands. + services = {} + for service, app, app_type in self._services.values(): + services[service] = {"app" : app, "app_type" : app_type} + + # queue them for writes. + self.writer.write(services=services) + except Exception: + log.debug("error setting service info", exc_info=True) def wrap(self, name=None, service=None, resource=None, span_type=None): """ diff --git a/tests/test_integration.py b/tests/test_integration.py index 896d5952a76..0768af2cc84 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -152,38 +152,6 @@ def test_worker_single_trace_multiple_spans(self): eq_(payload[0][0]['name'], 'client.testing') eq_(payload[0][1]['name'], 'client.testing') - def test_worker_single_service(self): - # service must be sent correctly - tracer = self.tracer - tracer.set_service_info('client.service', 'django', 'web') - tracer.trace('client.testing').finish() - - # expect a call for traces and services - self._wait_thread_flush() - eq_(self.api._put.call_count, 2) - # check and retrieve the right call - endpoint, payload = self._get_endpoint_payload(self.api._put.call_args_list, '/v0.3/services') - eq_(endpoint, '/v0.3/services') - eq_(len(payload.keys()), 1) - eq_(payload['client.service'], {'app': 'django', 'app_type': 'web'}) - - def test_worker_service_called_multiple_times(self): - # service must be sent correctly - tracer = self.tracer - tracer.set_service_info('backend', 'django', 'web') - tracer.set_service_info('database', 'postgres', 'db') - tracer.trace('client.testing').finish() - - # expect a call for traces and services - self._wait_thread_flush() - eq_(self.api._put.call_count, 2) - # check and retrieve the right call - endpoint, payload = self._get_endpoint_payload(self.api._put.call_args_list, '/v0.3/services') - eq_(endpoint, '/v0.3/services') - eq_(len(payload.keys()), 2) - eq_(payload['backend'], {'app': 'django', 'app_type': 'web'}) - eq_(payload['database'], {'app': 'postgres', 'app_type': 'db'}) - def test_worker_http_error_logging(self): # Tests the logging http error logic tracer = self.tracer From dc6350462b719f553257fc1979452de4baab5062 Mon Sep 17 00:00:00 2001 From: kyle-verhoog Date: Tue, 27 Nov 2018 11:44:12 +0100 Subject: [PATCH 04/11] [tests] ensure api is not called --- tests/test_integration.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/test_integration.py b/tests/test_integration.py index 0768af2cc84..0659a15af94 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -152,6 +152,27 @@ def test_worker_single_trace_multiple_spans(self): eq_(payload[0][0]['name'], 'client.testing') eq_(payload[0][1]['name'], 'client.testing') + def test_worker_single_service(self): + # service must be sent correctly + tracer = self.tracer + tracer.set_service_info('client.service', 'django', 'web') + tracer.trace('client.testing').finish() + + # expect a call for traces and services + self._wait_thread_flush() + eq_(self.api._put.call_count, 0) + + def test_worker_service_called_multiple_times(self): + # service must be sent correctly + tracer = self.tracer + tracer.set_service_info('backend', 'django', 'web') + tracer.set_service_info('database', 'postgres', 'db') + tracer.trace('client.testing').finish() + + # expect a call for traces and services + self._wait_thread_flush() + eq_(self.api._put.call_count, 0) + def test_worker_http_error_logging(self): # Tests the logging http error logic tracer = self.tracer From 9b53b7e9b99786f217bffe5b3052f700f3c5ef05 Mon Sep 17 00:00:00 2001 From: brettlangdon Date: Wed, 28 Nov 2018 15:26:41 -0500 Subject: [PATCH 05/11] Upgrade flake8 to 3.5.0 --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index ac3049975a3..76c165df15c 100644 --- a/tox.ini +++ b/tox.ini @@ -372,7 +372,7 @@ deps= ignore_outcome=true [testenv:flake8] -deps=flake8==3.2.0 +deps=flake8==3.5.0 commands=flake8 ddtrace basepython=python2 From b33899d1d91caf476e9d3da368501a4e69e78f4c Mon Sep 17 00:00:00 2001 From: brettlangdon Date: Wed, 28 Nov 2018 15:26:59 -0500 Subject: [PATCH 06/11] Fix bare except flake8 issues --- ddtrace/contrib/cassandra/session.py | 4 ++-- ddtrace/contrib/grpc/client_interceptor.py | 4 ++-- ddtrace/contrib/pylons/middleware.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ddtrace/contrib/cassandra/session.py b/ddtrace/contrib/cassandra/session.py index 39271057ff7..7880b5303a9 100644 --- a/ddtrace/contrib/cassandra/session.py +++ b/ddtrace/contrib/cassandra/session.py @@ -106,7 +106,7 @@ def traced_start_fetching_next_page(func, instance, args, kwargs): setattr(instance, CURRENT_SPAN, span) try: return func(*args, **kwargs) - except: + except Exception: with span: span.set_exc_info(*sys.exc_info()) raise @@ -161,7 +161,7 @@ def traced_execute_async(func, instance, args, kwargs): ) result.clear_callbacks() return result - except: + except Exception: with span: span.set_exc_info(*sys.exc_info()) raise diff --git a/ddtrace/contrib/grpc/client_interceptor.py b/ddtrace/contrib/grpc/client_interceptor.py index b05cdfee167..a3fd8affb10 100644 --- a/ddtrace/contrib/grpc/client_interceptor.py +++ b/ddtrace/contrib/grpc/client_interceptor.py @@ -34,7 +34,7 @@ def intercept_unary_stream(self, continuation, client_call_details, request): new_details = inject_span(span, client_call_details) try: return continuation(new_details, request) - except: + except Exception: span.set_traceback() raise @@ -48,6 +48,6 @@ def intercept_stream_stream(self, continuation, client_call_details, request_ite new_details = inject_span(span, client_call_details) try: return continuation(new_details, request_iterator) - except: + except Exception: span.set_traceback() raise diff --git a/ddtrace/contrib/pylons/middleware.py b/ddtrace/contrib/pylons/middleware.py index f103f2da36b..19c6ccf7f47 100644 --- a/ddtrace/contrib/pylons/middleware.py +++ b/ddtrace/contrib/pylons/middleware.py @@ -73,7 +73,7 @@ def _start_response(status, *args, **kwargs): code = int(code) if not 100 <= code < 600: code = 500 - except: + except Exception: code = 500 span.set_tag(http.STATUS_CODE, code) span.error = 1 From 11ff2ef9107975f2d471ebe404fd10220d6c6a27 Mon Sep 17 00:00:00 2001 From: kyle-verhoog Date: Thu, 29 Nov 2018 12:59:46 +0100 Subject: [PATCH 07/11] [tests] api call_count should be 1 for the trace --- tests/test_integration.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_integration.py b/tests/test_integration.py index 0659a15af94..6025fab8bad 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -160,7 +160,7 @@ def test_worker_single_service(self): # expect a call for traces and services self._wait_thread_flush() - eq_(self.api._put.call_count, 0) + eq_(self.api._put.call_count, 1) def test_worker_service_called_multiple_times(self): # service must be sent correctly @@ -171,7 +171,7 @@ def test_worker_service_called_multiple_times(self): # expect a call for traces and services self._wait_thread_flush() - eq_(self.api._put.call_count, 0) + eq_(self.api._put.call_count, 1) def test_worker_http_error_logging(self): # Tests the logging http error logic From 57dd1ae5c60620806553909feda266f5a6f25a11 Mon Sep 17 00:00:00 2001 From: Brett Langdon Date: Thu, 29 Nov 2018 09:28:13 -0500 Subject: [PATCH 08/11] remove flake8 ignores and fix issues (#744) * fix all E302 flake8 errors * Remove all flake8 ignores and fix issues --- ddtrace/api.py | 6 +++-- ddtrace/commands/ddtrace_run.py | 4 +-- ddtrace/contrib/bottle/patch.py | 3 ++- ddtrace/contrib/bottle/trace.py | 1 + ddtrace/contrib/cassandra/session.py | 16 ++++++++++- ddtrace/contrib/celery/signals.py | 2 +- ddtrace/contrib/celery/task.py | 1 + ddtrace/contrib/dbapi/__init__.py | 1 + ddtrace/contrib/django/cache.py | 5 ++-- ddtrace/contrib/django/conf.py | 6 +++-- ddtrace/contrib/django/middleware.py | 9 ++++++- ddtrace/contrib/django/patch.py | 2 ++ ddtrace/contrib/django/templates.py | 3 ++- ddtrace/contrib/elasticsearch/quantize.py | 1 + ddtrace/contrib/falcon/patch.py | 1 + ddtrace/contrib/flask/middleware.py | 2 ++ ddtrace/contrib/flask/patch.py | 2 +- ddtrace/contrib/flask_cache/utils.py | 1 + ddtrace/contrib/futures/threading.py | 1 + ddtrace/contrib/gevent/greenlet.py | 1 + ddtrace/contrib/grpc/client_interceptor.py | 1 + ddtrace/contrib/grpc/patch.py | 6 +++++ ddtrace/contrib/grpc/propagation.py | 2 ++ ddtrace/contrib/httplib/patch.py | 2 +- ddtrace/contrib/kombu/patch.py | 2 +- ddtrace/contrib/mongoengine/__init__.py | 2 -- ddtrace/contrib/mongoengine/patch.py | 3 ++- ddtrace/contrib/mysql/patch.py | 8 ++++-- ddtrace/contrib/mysqldb/patch.py | 2 +- ddtrace/contrib/psycopg/connection.py | 18 ++++++++----- ddtrace/contrib/psycopg/patch.py | 9 ++++--- ddtrace/contrib/pylibmc/addrs.py | 3 +-- ddtrace/contrib/pylibmc/patch.py | 2 +- ddtrace/contrib/pymongo/client.py | 2 ++ ddtrace/contrib/pymongo/parse.py | 28 +++++++++++--------- ddtrace/contrib/pymongo/patch.py | 2 +- ddtrace/contrib/pyramid/patch.py | 3 +++ ddtrace/contrib/pyramid/trace.py | 3 ++- ddtrace/contrib/redis/patch.py | 6 ++++- ddtrace/contrib/requests/connection.py | 2 +- ddtrace/contrib/requests/patch.py | 2 +- ddtrace/contrib/sqlalchemy/engine.py | 3 ++- ddtrace/contrib/sqlite3/patch.py | 5 ++++ ddtrace/encoding.py | 3 ++- ddtrace/ext/apps.py | 1 - ddtrace/ext/errors.py | 7 ++--- ddtrace/ext/http.py | 1 + ddtrace/ext/mongo.py | 2 -- ddtrace/ext/sql.py | 2 +- ddtrace/filters.py | 1 + ddtrace/monkey.py | 9 +++++-- ddtrace/opentracer/helpers.py | 1 + ddtrace/opentracer/propagation/propagator.py | 1 + ddtrace/sampler.py | 2 ++ ddtrace/span.py | 13 ++++----- ddtrace/tracer.py | 2 +- ddtrace/utils/__init__.py | 1 - ddtrace/utils/wrappers.py | 1 - ddtrace/writer.py | 8 +++--- tox.ini | 2 -- 60 files changed, 163 insertions(+), 78 deletions(-) diff --git a/ddtrace/api.py b/ddtrace/api.py index 5330632cf5e..fcdfff7e1e4 100644 --- a/ddtrace/api.py +++ b/ddtrace/api.py @@ -18,14 +18,15 @@ 'compatibility_mode': False, 'fallback': 'v0.3'}, 'v0.3': {'traces': '/v0.3/traces', - 'services': '/v0.3/services', + 'services': '/v0.3/services', 'compatibility_mode': False, 'fallback': 'v0.2'}, 'v0.2': {'traces': '/v0.2/traces', - 'services': '/v0.2/services', + 'services': '/v0.2/services', 'compatibility_mode': True, 'fallback': None}} + def _parse_response_json(response): """ Parse the content of a response object, and return the right type, @@ -48,6 +49,7 @@ def _parse_response_json(response): except (ValueError, TypeError) as err: log.debug("unable to load JSON '%s': %s" % (body, err)) + class API(object): """ Send data to the trace agent using the HTTP protocol and JSON format diff --git a/ddtrace/commands/ddtrace_run.py b/ddtrace/commands/ddtrace_run.py index 89defd7c982..e82e24429f6 100755 --- a/ddtrace/commands/ddtrace_run.py +++ b/ddtrace/commands/ddtrace_run.py @@ -33,6 +33,7 @@ DATADOG_PRIORITY_SAMPLING=true|false : (default: false): enables Priority Sampling. """ # noqa + def _ddtrace_root(): from ddtrace import __file__ return os.path.dirname(__file__) @@ -46,8 +47,7 @@ def _add_bootstrap_to_pythonpath(bootstrap_dir): python_path = os.environ.get('PYTHONPATH', '') if python_path: - new_path = "%s%s%s" % (bootstrap_dir, os.path.pathsep, - os.environ['PYTHONPATH']) + new_path = "%s%s%s" % (bootstrap_dir, os.path.pathsep, os.environ['PYTHONPATH']) os.environ['PYTHONPATH'] = new_path else: os.environ['PYTHONPATH'] = bootstrap_dir diff --git a/ddtrace/contrib/bottle/patch.py b/ddtrace/contrib/bottle/patch.py index b7e38404688..7f57fa579d4 100644 --- a/ddtrace/contrib/bottle/patch.py +++ b/ddtrace/contrib/bottle/patch.py @@ -1,4 +1,3 @@ - import os from .trace import TracePlugin @@ -7,6 +6,7 @@ import wrapt + def patch(): """Patch the bottle.Bottle class """ @@ -16,6 +16,7 @@ def patch(): setattr(bottle, '_datadog_patch', True) wrapt.wrap_function_wrapper('bottle', 'Bottle.__init__', traced_init) + def traced_init(wrapped, instance, args, kwargs): wrapped(*args, **kwargs) diff --git a/ddtrace/contrib/bottle/trace.py b/ddtrace/contrib/bottle/trace.py index 87d4c999ea4..8fc735a2915 100644 --- a/ddtrace/contrib/bottle/trace.py +++ b/ddtrace/contrib/bottle/trace.py @@ -10,6 +10,7 @@ SPAN_TYPE = 'web' + class TracePlugin(object): name = 'trace' api = 2 diff --git a/ddtrace/contrib/cassandra/session.py b/ddtrace/contrib/cassandra/session.py index 7880b5303a9..d00c7fa120a 100644 --- a/ddtrace/contrib/cassandra/session.py +++ b/ddtrace/contrib/cassandra/session.py @@ -25,15 +25,18 @@ # Original connect connect function _connect = cassandra.cluster.Cluster.connect + def patch(): """ patch will add tracing to the cassandra library. """ setattr(cassandra.cluster.Cluster, 'connect', wrapt.FunctionWrapper(_connect, traced_connect)) Pin(service=SERVICE, app=SERVICE, app_type="db").onto(cassandra.cluster.Cluster) + def unpatch(): cassandra.cluster.Cluster.connect = _connect + def traced_connect(func, instance, args, kwargs): session = func(*args, **kwargs) if not isinstance(session.execute, wrapt.FunctionWrapper): @@ -41,6 +44,7 @@ def traced_connect(func, instance, args, kwargs): setattr(session, 'execute_async', wrapt.FunctionWrapper(session.execute_async, traced_execute_async)) return session + def _close_span_on_success(result, future): span = getattr(future, CURRENT_SPAN, None) if not span: @@ -54,11 +58,13 @@ def _close_span_on_success(result, future): span.finish() delattr(future, CURRENT_SPAN) + def traced_set_final_result(func, instance, args, kwargs): result = args[0] _close_span_on_success(result, instance) return func(*args, **kwargs) + def _close_span_on_error(exc, future): span = getattr(future, CURRENT_SPAN, None) if not span: @@ -76,11 +82,13 @@ def _close_span_on_error(exc, future): span.finish() delattr(future, CURRENT_SPAN) + def traced_set_final_exception(func, instance, args, kwargs): exc = args[0] _close_span_on_error(exc, instance) return func(*args, **kwargs) + def traced_start_fetching_next_page(func, instance, args, kwargs): has_more_pages = getattr(instance, 'has_more_pages', True) if not has_more_pages: @@ -111,6 +119,7 @@ def traced_start_fetching_next_page(func, instance, args, kwargs): span.set_exc_info(*sys.exc_info()) raise + def traced_execute_async(func, instance, args, kwargs): cluster = getattr(instance, 'cluster', None) pin = Pin.get_from(cluster) @@ -166,6 +175,7 @@ def traced_execute_async(func, instance, args, kwargs): span.set_exc_info(*sys.exc_info()) raise + def _start_span_and_set_tags(pin, query, session, cluster): service = pin.service tracer = pin.tracer @@ -175,6 +185,7 @@ def _start_span_and_set_tags(pin, query, session, cluster): span.set_tags(_extract_cluster_metas(cluster)) return span + def _extract_session_metas(session): metas = {} @@ -185,6 +196,7 @@ def _extract_session_metas(session): return metas + def _extract_cluster_metas(cluster): metas = {} if deep_getattr(cluster, "metadata.cluster_name"): @@ -194,6 +206,7 @@ def _extract_cluster_metas(cluster): return metas + def _extract_result_metas(result): metas = {} if result is None: @@ -230,6 +243,7 @@ def _extract_result_metas(result): return metas + def _sanitize_query(span, query): # TODO (aaditya): fix this hacky type check. we need it to avoid circular imports t = type(query).__name__ @@ -250,7 +264,7 @@ def _sanitize_query(span, query): elif t == 'str': resource = query else: - resource = 'unknown-query-type' # FIXME[matt] what else do to here? + resource = 'unknown-query-type' # FIXME[matt] what else do to here? span.resource = stringify(resource)[:RESOURCE_MAX_LENGTH] diff --git a/ddtrace/contrib/celery/signals.py b/ddtrace/contrib/celery/signals.py index 2b7357a6ce0..e24e8b0bad8 100644 --- a/ddtrace/contrib/celery/signals.py +++ b/ddtrace/contrib/celery/signals.py @@ -13,10 +13,10 @@ retrieve_span, ) - log = logging.getLogger(__name__) SPAN_TYPE = 'worker' + def trace_prerun(*args, **kwargs): # safe-guard to avoid crashes in case the signals API # changes in Celery diff --git a/ddtrace/contrib/celery/task.py b/ddtrace/contrib/celery/task.py index 4657099aa17..be6c1dd1870 100644 --- a/ddtrace/contrib/celery/task.py +++ b/ddtrace/contrib/celery/task.py @@ -18,6 +18,7 @@ def patch_task(task, pin=None): patch_app(task.app) return task + def unpatch_task(task): """Deprecated API. The new API uses signals that can be deactivated via unpatch() API. This API is now a no-op implementation so it doesn't diff --git a/ddtrace/contrib/dbapi/__init__.py b/ddtrace/contrib/dbapi/__init__.py index f7809bfff7c..849c8b98690 100644 --- a/ddtrace/contrib/dbapi/__init__.py +++ b/ddtrace/contrib/dbapi/__init__.py @@ -153,6 +153,7 @@ def rollback(self, *args, **kwargs): span_name = '{}.{}'.format(self._self_datadog_name, 'rollback') return self._trace_method(self.__wrapped__.rollback, span_name, {}, *args, **kwargs) + def _get_vendor(conn): """ Return the vendor (e.g postgres, mysql) of the given database. diff --git a/ddtrace/contrib/django/cache.py b/ddtrace/contrib/django/cache.py index e0cdba60cfb..68f202145e5 100644 --- a/ddtrace/contrib/django/cache.py +++ b/ddtrace/contrib/django/cache.py @@ -54,8 +54,7 @@ def _trace_operation(fn, method_name): def wrapped(self, *args, **kwargs): # get the original function method method = getattr(self, DATADOG_NAMESPACE.format(method=method_name)) - with tracer.trace('django.cache', - span_type=TYPE, service=cache_service_name) as span: + with tracer.trace('django.cache', span_type=TYPE, service=cache_service_name) as span: # update the resource name and tag the cache backend span.resource = _resource_from_cache_prefix(method_name, self) cache_backend = '{}.{}'.format(self.__module__, self.__class__.__name__) @@ -93,6 +92,7 @@ def _wrap_method(cls, method_name): for method in TRACED_METHODS: _wrap_method(cache, method) + def unpatch_method(cls, method_name): method = getattr(cls, DATADOG_NAMESPACE.format(method=method_name), None) if method is None: @@ -101,6 +101,7 @@ def unpatch_method(cls, method_name): setattr(cls, method_name, method) delattr(cls, DATADOG_NAMESPACE.format(method=method_name)) + def unpatch_cache(): cache_backends = set([cache['BACKEND'] for cache in django_settings.CACHES.values()]) for cache_module in cache_backends: diff --git a/ddtrace/contrib/django/conf.py b/ddtrace/contrib/django/conf.py index f5a23c36400..a23cabc5b55 100644 --- a/ddtrace/contrib/django/conf.py +++ b/ddtrace/contrib/django/conf.py @@ -59,8 +59,10 @@ def import_from_string(val, setting_name): return getattr(module, class_name) except (ImportError, AttributeError) as e: msg = 'Could not import "{}" for setting "{}". {}: {}.'.format( - val, setting_name, - e.__class__.__name__, e + val, + setting_name, + e.__class__.__name__, + e, ) raise ImportError(msg) diff --git a/ddtrace/contrib/django/middleware.py b/ddtrace/contrib/django/middleware.py index 8d6cc378429..78e00a1d2a4 100644 --- a/ddtrace/contrib/django/middleware.py +++ b/ddtrace/contrib/django/middleware.py @@ -47,26 +47,31 @@ def get_middleware_insertion_point(): return MIDDLEWARE, middleware return MIDDLEWARE_CLASSES, getattr(django_settings, MIDDLEWARE_CLASSES, None) + def insert_trace_middleware(): middleware_attribute, middleware = get_middleware_insertion_point() if middleware is not None and TRACE_MIDDLEWARE not in set(middleware): setattr(django_settings, middleware_attribute, type(middleware)((TRACE_MIDDLEWARE,)) + middleware) + def remove_trace_middleware(): _, middleware = get_middleware_insertion_point() if middleware and TRACE_MIDDLEWARE in set(middleware): middleware.remove(TRACE_MIDDLEWARE) + def insert_exception_middleware(): middleware_attribute, middleware = get_middleware_insertion_point() if middleware is not None and EXCEPTION_MIDDLEWARE not in set(middleware): setattr(django_settings, middleware_attribute, middleware + type(middleware)((EXCEPTION_MIDDLEWARE,))) + def remove_exception_middleware(): _, middleware = get_middleware_insertion_point() if middleware and EXCEPTION_MIDDLEWARE in set(middleware): middleware.remove(EXCEPTION_MIDDLEWARE) + class InstrumentationMixin(MiddlewareClass): """ Useful mixin base class for tracing middlewares @@ -88,7 +93,7 @@ def process_exception(self, request, exception): span = _get_req_span(request) if span: span.set_tag(http.STATUS_CODE, '500') - span.set_traceback() # will set the exception info + span.set_traceback() # will set the exception info except Exception: log.debug("error processing exception", exc_info=True) @@ -172,10 +177,12 @@ def _get_req_span(request): """ Return the datadog span from the given request. """ return getattr(request, '_datadog_request_span', None) + def _set_req_span(request, span): """ Set the datadog span on the given request. """ return setattr(request, '_datadog_request_span', span) + def _set_auth_tags(span, request): """ Patch any available auth tags from the request onto the span. """ user = getattr(request, 'user', None) diff --git a/ddtrace/contrib/django/patch.py b/ddtrace/contrib/django/patch.py index 7dafe918dda..633c037713c 100644 --- a/ddtrace/contrib/django/patch.py +++ b/ddtrace/contrib/django/patch.py @@ -2,6 +2,7 @@ import django + def patch(): """Patch the instrumented methods """ @@ -12,6 +13,7 @@ def patch(): _w = wrapt.wrap_function_wrapper _w('django', 'setup', traced_setup) + def traced_setup(wrapped, instance, args, kwargs): from django.conf import settings diff --git a/ddtrace/contrib/django/templates.py b/ddtrace/contrib/django/templates.py index 98504b60f87..a15bf34c028 100644 --- a/ddtrace/contrib/django/templates.py +++ b/ddtrace/contrib/django/templates.py @@ -12,11 +12,11 @@ # 3p from django.template import Template - log = logging.getLogger(__name__) RENDER_ATTR = '_datadog_original_render' + def patch_template(tracer): """ will patch django's template rendering function to include timing and trace information. @@ -42,6 +42,7 @@ def traced_render(self, context): Template.render = traced_render + def unpatch_template(): render = getattr(Template, RENDER_ATTR, None) if render is None: diff --git a/ddtrace/contrib/elasticsearch/quantize.py b/ddtrace/contrib/elasticsearch/quantize.py index 580a8b2a26b..64b688ce392 100644 --- a/ddtrace/contrib/elasticsearch/quantize.py +++ b/ddtrace/contrib/elasticsearch/quantize.py @@ -11,6 +11,7 @@ INDEX_REGEXP = re.compile(r'[0-9]{2,}') INDEX_PLACEHOLDER = r'?' + def quantize(span): """Quantize an elasticsearch span diff --git a/ddtrace/contrib/falcon/patch.py b/ddtrace/contrib/falcon/patch.py index 95f7a18d1c9..091ede6aaec 100644 --- a/ddtrace/contrib/falcon/patch.py +++ b/ddtrace/contrib/falcon/patch.py @@ -19,6 +19,7 @@ def patch(): setattr(falcon, '_datadog_patch', True) wrapt.wrap_function_wrapper('falcon', 'API.__init__', traced_init) + def traced_init(wrapped, instance, args, kwargs): mw = kwargs.pop('middleware', []) service = os.environ.get('DATADOG_SERVICE_NAME') or 'falcon' diff --git a/ddtrace/contrib/flask/middleware.py b/ddtrace/contrib/flask/middleware.py index d3461d587d7..7771c8ab91e 100644 --- a/ddtrace/contrib/flask/middleware.py +++ b/ddtrace/contrib/flask/middleware.py @@ -177,6 +177,7 @@ def _finish_span(self, span, exception=None): span.set_tag(http.METHOD, method) span.finish() + def _set_error_on_span(span, exception): # The 3 next lines might not be strictly required, since `set_traceback` # also get the exception from the sys.exc_info (and fill the error meta). @@ -188,6 +189,7 @@ def _set_error_on_span(span, exception): # so attach the stack trace with `set_traceback`. span.set_traceback() + def _patch_render(tracer): """ patch flask's render template methods with the given tracer. """ # fall back to patching global method diff --git a/ddtrace/contrib/flask/patch.py b/ddtrace/contrib/flask/patch.py index 5ffbf1fd1ef..259825b9abd 100644 --- a/ddtrace/contrib/flask/patch.py +++ b/ddtrace/contrib/flask/patch.py @@ -477,7 +477,7 @@ def outer(wrapped, instance, args, kwargs): app = None if isinstance(sender, flask.Flask): app = sender - for receiver in wrapped(*args ,**kwargs): + for receiver in wrapped(*args, **kwargs): yield wrap_signal(app, signal, receiver) return outer diff --git a/ddtrace/contrib/flask_cache/utils.py b/ddtrace/contrib/flask_cache/utils.py index ca15b61ea49..67f6f9e78cb 100644 --- a/ddtrace/contrib/flask_cache/utils.py +++ b/ddtrace/contrib/flask_cache/utils.py @@ -3,6 +3,7 @@ from ..redis.util import _extract_conn_tags as extract_redis_tags from ..pylibmc.addrs import parse_addresses + def _resource_from_cache_prefix(resource, cache): """ Combine the resource name with the cache prefix (if any) diff --git a/ddtrace/contrib/futures/threading.py b/ddtrace/contrib/futures/threading.py index 039a8f1b5ce..f91aa10c674 100644 --- a/ddtrace/contrib/futures/threading.py +++ b/ddtrace/contrib/futures/threading.py @@ -16,6 +16,7 @@ def _wrap_submit(func, instance, args, kwargs): fn_args = args[1:] return func(_wrap_execution, current_ctx, fn, fn_args, kwargs) + def _wrap_execution(ctx, fn, args, kwargs): """ Intermediate target function that is executed in a new thread; diff --git a/ddtrace/contrib/gevent/greenlet.py b/ddtrace/contrib/gevent/greenlet.py index dc46c4e91da..3c48e490848 100644 --- a/ddtrace/contrib/gevent/greenlet.py +++ b/ddtrace/contrib/gevent/greenlet.py @@ -5,6 +5,7 @@ GEVENT_VERSION = gevent.version_info[0:3] + class TracingMixin(object): def __init__(self, *args, **kwargs): # get the current Context if available diff --git a/ddtrace/contrib/grpc/client_interceptor.py b/ddtrace/contrib/grpc/client_interceptor.py index a3fd8affb10..451ead614e6 100644 --- a/ddtrace/contrib/grpc/client_interceptor.py +++ b/ddtrace/contrib/grpc/client_interceptor.py @@ -3,6 +3,7 @@ from ddtrace import Pin from .propagation import inject_span + class GrpcClientInterceptor( grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor, grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor): diff --git a/ddtrace/contrib/grpc/patch.py b/ddtrace/contrib/grpc/patch.py index 1d60a6887a7..660cbc34903 100644 --- a/ddtrace/contrib/grpc/patch.py +++ b/ddtrace/contrib/grpc/patch.py @@ -6,6 +6,7 @@ from .client_interceptor import GrpcClientInterceptor + def patch(): # patch only once if getattr(grpc, '__datadog_patch', False): @@ -18,6 +19,7 @@ def patch(): _w('grpc', 'insecure_channel', _insecure_channel_with_interceptor) _w('grpc', 'secure_channel', _secure_channel_with_interceptor) + def unpatch(): if not getattr(grpc, '__datadog_patch', False): return @@ -25,6 +27,7 @@ def unpatch(): unwrap(grpc, 'secure_channel') unwrap(grpc, 'insecure_channel') + def _insecure_channel_with_interceptor(wrapped, instance, args, kwargs): channel = wrapped(*args, **kwargs) target = args[0] @@ -32,6 +35,7 @@ def _insecure_channel_with_interceptor(wrapped, instance, args, kwargs): channel = _intercept_channel(channel, host, port) return channel + def _secure_channel_with_interceptor(wrapped, instance, args, kwargs): channel = wrapped(*args, **kwargs) target = args[0] @@ -39,9 +43,11 @@ def _secure_channel_with_interceptor(wrapped, instance, args, kwargs): channel = _intercept_channel(channel, host, port) return channel + def _intercept_channel(channel, host, port): return grpc.intercept_channel(channel, GrpcClientInterceptor(host, port)) + def get_host_port(target): split = target.rsplit(':', 2) diff --git a/ddtrace/contrib/grpc/propagation.py b/ddtrace/contrib/grpc/propagation.py index 07dc6fda8c5..5a7c7bbec0e 100644 --- a/ddtrace/contrib/grpc/propagation.py +++ b/ddtrace/contrib/grpc/propagation.py @@ -1,6 +1,7 @@ import grpc import collections + class ClientCallDetails( collections.namedtuple( '_ClientCallDetails', @@ -10,6 +11,7 @@ class ClientCallDetails( """ # noqa pass + def inject_span(span, client_call_details): """Inject propagation headers in grpc call metadata. Recreates a new object diff --git a/ddtrace/contrib/httplib/patch.py b/ddtrace/contrib/httplib/patch.py index a111dd7e02b..3eef7bf34e7 100644 --- a/ddtrace/contrib/httplib/patch.py +++ b/ddtrace/contrib/httplib/patch.py @@ -74,7 +74,7 @@ def _wrap_putrequest(func, instance, args, kwargs): parsed.netloc, parsed.path, parsed.params, - None, # drop query + None, # drop query parsed.fragment )) diff --git a/ddtrace/contrib/kombu/patch.py b/ddtrace/contrib/kombu/patch.py index dedbfa53cdb..92e3a44bba4 100644 --- a/ddtrace/contrib/kombu/patch.py +++ b/ddtrace/contrib/kombu/patch.py @@ -21,7 +21,7 @@ ) # kombu default settings -config._add('kombu',{ +config._add('kombu', { 'service_name': get_env('kombu', 'service_name', DEFAULT_SERVICE) }) diff --git a/ddtrace/contrib/mongoengine/__init__.py b/ddtrace/contrib/mongoengine/__init__.py index 554802f0877..384c04ab771 100644 --- a/ddtrace/contrib/mongoengine/__init__.py +++ b/ddtrace/contrib/mongoengine/__init__.py @@ -27,5 +27,3 @@ from .patch import patch, trace_mongoengine __all__ = ['patch', 'trace_mongoengine'] - - diff --git a/ddtrace/contrib/mongoengine/patch.py b/ddtrace/contrib/mongoengine/patch.py index a623b806159..7faa72d4cd7 100644 --- a/ddtrace/contrib/mongoengine/patch.py +++ b/ddtrace/contrib/mongoengine/patch.py @@ -10,10 +10,11 @@ def patch(): setattr(mongoengine, 'connect', WrappedConnect(_connect)) + def unpatch(): setattr(mongoengine, 'connect', _connect) + @deprecated(message='Use patching instead (see the docs).', version='1.0.0') def trace_mongoengine(*args, **kwargs): return _connect - diff --git a/ddtrace/contrib/mysql/patch.py b/ddtrace/contrib/mysql/patch.py index 87ee583d6d6..c2235fb1d61 100644 --- a/ddtrace/contrib/mysql/patch.py +++ b/ddtrace/contrib/mysql/patch.py @@ -9,28 +9,32 @@ CONN_ATTR_BY_TAG = { - net.TARGET_HOST : 'server_host', - net.TARGET_PORT : 'server_port', + net.TARGET_HOST: 'server_host', + net.TARGET_PORT: 'server_port', db.USER: 'user', db.NAME: 'database', } + def patch(): wrapt.wrap_function_wrapper('mysql.connector', 'connect', _connect) # `Connect` is an alias for `connect`, patch it too if hasattr(mysql.connector, 'Connect'): mysql.connector.Connect = mysql.connector.connect + def unpatch(): if isinstance(mysql.connector.connect, wrapt.ObjectProxy): mysql.connector.connect = mysql.connector.connect.__wrapped__ if hasattr(mysql.connector, 'Connect'): mysql.connector.Connect = mysql.connector.connect + def _connect(func, instance, args, kwargs): conn = func(*args, **kwargs) return patch_conn(conn) + def patch_conn(conn): tags = {t: getattr(conn, a) for t, a in CONN_ATTR_BY_TAG.items() if getattr(conn, a, '') != ''} diff --git a/ddtrace/contrib/mysqldb/patch.py b/ddtrace/contrib/mysqldb/patch.py index 40b8f282623..8979ebb9c56 100644 --- a/ddtrace/contrib/mysqldb/patch.py +++ b/ddtrace/contrib/mysqldb/patch.py @@ -10,13 +10,13 @@ from ...ext import net, db, AppTypes from ...utils.wrappers import unwrap as _u - KWPOS_BY_TAG = { net.TARGET_HOST: ('host', 0), db.USER: ('user', 1), db.NAME: ('db', 3), } + def patch(): # patch only once if getattr(MySQLdb, '__datadog_patch', False): diff --git a/ddtrace/contrib/psycopg/connection.py b/ddtrace/contrib/psycopg/connection.py index 09550c58342..17440d1fdcf 100644 --- a/ddtrace/contrib/psycopg/connection.py +++ b/ddtrace/contrib/psycopg/connection.py @@ -30,9 +30,11 @@ def connection_factory(tracer, service="postgres"): app_type=AppTypes.db, ) - return functools.partial(TracedConnection, + return functools.partial( + TracedConnection, datadog_tracer=tracer, - datadog_service=service) + datadog_service=service, + ) class TracedCursor(cursor): @@ -83,13 +85,15 @@ def __init__(self, *args, **kwargs): net.TARGET_PORT: dsn.get("port"), db.NAME: dsn.get("dbname"), db.USER: dsn.get("user"), - "db.application" : dsn.get("application_name"), + "db.application": dsn.get("application_name"), } - self._datadog_cursor_class = functools.partial(TracedCursor, - datadog_tracer=self._datadog_tracer, - datadog_service=self._datadog_service, - datadog_tags=self._datadog_tags) + self._datadog_cursor_class = functools.partial( + TracedCursor, + datadog_tracer=self._datadog_tracer, + datadog_service=self._datadog_service, + datadog_tags=self._datadog_tags, + ) def cursor(self, *args, **kwargs): """ register our custom cursor factory """ diff --git a/ddtrace/contrib/psycopg/patch.py b/ddtrace/contrib/psycopg/patch.py index 10ccc3b3e50..bbf1fca19f2 100644 --- a/ddtrace/contrib/psycopg/patch.py +++ b/ddtrace/contrib/psycopg/patch.py @@ -44,7 +44,7 @@ def patch_conn(conn, traced_conn_cls=dbapi.TracedConnection): net.TARGET_PORT: dsn.get("port"), db.NAME: dsn.get("dbname"), db.USER: dsn.get("user"), - "db.application" : dsn.get("application_name"), + "db.application": dsn.get("application_name"), } Pin( @@ -93,6 +93,7 @@ def _unroll_args(obj, scope=None): return func(obj, scope) if scope else func(obj) + def _extensions_quote_ident(func, _, args, kwargs): def _unroll_args(obj, scope=None): return obj, scope @@ -105,6 +106,7 @@ def _unroll_args(obj, scope=None): return func(obj, scope) if scope else func(obj) + def _extensions_adapt(func, _, args, kwargs): adapt = func(*args, **kwargs) if hasattr(adapt, 'prepare'): @@ -151,7 +153,6 @@ def prepare(self, *args, **kwargs): # `quote_ident` attribute is only available for psycopg >= 2.7 if getattr(psycopg2, 'extensions', None) and getattr(psycopg2.extensions, 'quote_ident', None): - _psycopg2_extensions += [(psycopg2.extensions.quote_ident, - psycopg2.extensions, 'quote_ident', - _extensions_quote_ident), + _psycopg2_extensions += [ + (psycopg2.extensions.quote_ident, psycopg2.extensions, 'quote_ident', _extensions_quote_ident), ] diff --git a/ddtrace/contrib/pylibmc/addrs.py b/ddtrace/contrib/pylibmc/addrs.py index 69c08f8cb8b..0f11d2ac44c 100644 --- a/ddtrace/contrib/pylibmc/addrs.py +++ b/ddtrace/contrib/pylibmc/addrs.py @@ -1,5 +1,3 @@ - - translate_server_specs = None try: @@ -9,6 +7,7 @@ except ImportError: pass + def parse_addresses(addrs): if not translate_server_specs: return [] diff --git a/ddtrace/contrib/pylibmc/patch.py b/ddtrace/contrib/pylibmc/patch.py index 321035b8ea3..bf1606a6276 100644 --- a/ddtrace/contrib/pylibmc/patch.py +++ b/ddtrace/contrib/pylibmc/patch.py @@ -9,6 +9,6 @@ def patch(): setattr(pylibmc, 'Client', TracedClient) + def unpatch(): setattr(pylibmc, 'Client', _Client) - diff --git a/ddtrace/contrib/pymongo/client.py b/ddtrace/contrib/pymongo/client.py index caaae27a2f3..30069410bce 100644 --- a/ddtrace/contrib/pymongo/client.py +++ b/ddtrace/contrib/pymongo/client.py @@ -234,12 +234,14 @@ def normalize_filter(f=None): # least it won't crash. return {} + def _set_address_tags(span, address): # the address is only set after the cursor is done. if address: span.set_tag(netx.TARGET_HOST, address[0]) span.set_tag(netx.TARGET_PORT, address[1]) + def _set_query_metadata(span, cmd): """ Sets span `mongodb.query` tag and resource given command query """ if cmd.query: diff --git a/ddtrace/contrib/pymongo/parse.py b/ddtrace/contrib/pymongo/parse.py index 0faf368f2b2..e9d31159727 100644 --- a/ddtrace/contrib/pymongo/parse.py +++ b/ddtrace/contrib/pymongo/parse.py @@ -18,17 +18,17 @@ # MongoDB wire protocol commands # http://docs.mongodb.com/manual/reference/mongodb-wire-protocol OP_CODES = { - 1 : "reply", - 1000 : "msg", - 2001 : "update", - 2002 : "insert", - 2003 : "reserved", - 2004 : "query", - 2005 : "get_more", - 2006 : "delete", - 2007 : "kill_cursors", - 2010 : "command", - 2011 : "command_reply", + 1: "reply", + 1000: "msg", + 2001: "update", + 2002: "insert", + 2003: "reserved", + 2004: "query", + 2005: "get_more", + 2006: "delete", + 2007: "kill_cursors", + 2010: "command", + 2011: "command_reply", } # The maximum message length we'll try to parse @@ -115,6 +115,7 @@ def parse_msg(msg_bytes): cmd.metrics[netx.BYTES_OUT] = msg_len return cmd + def parse_query(query): """ Return a command parsed from the given mongo db query. """ db, coll = None, None @@ -132,6 +133,7 @@ def parse_query(query): cmd.query = query.spec return cmd + def parse_spec(spec, db=None): """ Return a Command that has parsed the relevant detail for the given pymongo SON spec. @@ -144,7 +146,7 @@ def parse_spec(spec, db=None): name, coll = items[0] cmd = Command(name, db, coll) - if 'ordered' in spec: # in insert and update + if 'ordered' in spec: # in insert and update cmd.tags['mongodb.ordered'] = spec['ordered'] if cmd.name == 'insert': @@ -165,10 +167,12 @@ def parse_spec(spec, db=None): return cmd + def _cstring(raw): """ Return the first null terminated cstring from the bufffer. """ return ctypes.create_string_buffer(raw).value + def _split_namespace(ns): """ Return a tuple of (db, collecton) from the "db.coll" string. """ if ns: diff --git a/ddtrace/contrib/pymongo/patch.py b/ddtrace/contrib/pymongo/patch.py index f4e04eae03d..2175c8d56a1 100644 --- a/ddtrace/contrib/pymongo/patch.py +++ b/ddtrace/contrib/pymongo/patch.py @@ -9,6 +9,6 @@ def patch(): setattr(pymongo, 'MongoClient', TracedMongoClient) + def unpatch(): setattr(pymongo, 'MongoClient', _MongoClient) - diff --git a/ddtrace/contrib/pyramid/patch.py b/ddtrace/contrib/pyramid/patch.py index 4a9b7b72b2f..da3d2779581 100644 --- a/ddtrace/contrib/pyramid/patch.py +++ b/ddtrace/contrib/pyramid/patch.py @@ -11,6 +11,7 @@ DD_PATCH = '_datadog_patch' + def patch(): """ Patch pyramid.config.Configurator @@ -22,6 +23,7 @@ def patch(): _w = wrapt.wrap_function_wrapper _w('pyramid.config', 'Configurator.__init__', traced_init) + def traced_init(wrapped, instance, args, kwargs): settings = kwargs.pop('settings', {}) service = os.environ.get('DATADOG_SERVICE_NAME') or 'pyramid' @@ -45,6 +47,7 @@ def traced_init(wrapped, instance, args, kwargs): wrapped(*args, **kwargs) trace_pyramid(instance) + def insert_tween_if_needed(settings): tweens = settings.get('pyramid.tweens') # If the list is empty, pyramid does not consider the tweens have been diff --git a/ddtrace/contrib/pyramid/trace.py b/ddtrace/contrib/pyramid/trace.py index 4abb583b229..d59e75690ea 100644 --- a/ddtrace/contrib/pyramid/trace.py +++ b/ddtrace/contrib/pyramid/trace.py @@ -1,4 +1,3 @@ - # 3p import logging import pyramid.renderers @@ -27,6 +26,7 @@ def trace_pyramid(config): config.include('ddtrace.contrib.pyramid') + def includeme(config): # Add our tween just before the default exception handler config.add_tween(DD_TWEEN_NAME, over=pyramid.tweens.EXCVIEW) @@ -51,6 +51,7 @@ def trace_render(func, instance, args, kwargs): span.span_type = http.TEMPLATE return func(*args, **kwargs) + def trace_tween_factory(handler, registry): # configuration settings = registry.settings diff --git a/ddtrace/contrib/redis/patch.py b/ddtrace/contrib/redis/patch.py index 2ee34cba472..7aa2d30a4a2 100644 --- a/ddtrace/contrib/redis/patch.py +++ b/ddtrace/contrib/redis/patch.py @@ -34,6 +34,7 @@ def patch(): _w('redis.client', 'Pipeline.immediate_execute_command', traced_execute_command) Pin(service=redisx.DEFAULT_SERVICE, app=redisx.APP, app_type=AppTypes.db).onto(redis.StrictRedis) + def unpatch(): if getattr(redis, '_datadog_patch', False): setattr(redis, '_datadog_patch', False) @@ -50,10 +51,10 @@ def unpatch(): unwrap(redis.client.Pipeline, 'execute') unwrap(redis.client.Pipeline, 'immediate_execute_command') + # # tracing functions # - def traced_execute_command(func, instance, args, kwargs): pin = Pin.get_from(instance) if not pin or not pin.enabled(): @@ -70,6 +71,7 @@ def traced_execute_command(func, instance, args, kwargs): # run the command return func(*args, **kwargs) + def traced_pipeline(func, instance, args, kwargs): pipeline = func(*args, **kwargs) pin = Pin.get_from(instance) @@ -77,6 +79,7 @@ def traced_pipeline(func, instance, args, kwargs): pin.onto(pipeline) return pipeline + def traced_execute_pipeline(func, instance, args, kwargs): pin = Pin.get_from(instance) if not pin or not pin.enabled(): @@ -93,5 +96,6 @@ def traced_execute_pipeline(func, instance, args, kwargs): s.set_metric(redisx.PIPELINE_LEN, len(instance.command_stack)) return func(*args, **kwargs) + def _get_tags(conn): return _extract_conn_tags(conn.connection_pool.connection_kwargs) diff --git a/ddtrace/contrib/requests/connection.py b/ddtrace/contrib/requests/connection.py index ce7d7333aca..2a8b82350a0 100644 --- a/ddtrace/contrib/requests/connection.py +++ b/ddtrace/contrib/requests/connection.py @@ -64,7 +64,7 @@ def _wrap_send(func, instance, args, kwargs): parsed_uri.netloc, parsed_uri.path, parsed_uri.params, - None, # drop parsed_uri.query + None, # drop parsed_uri.query parsed_uri.fragment )) diff --git a/ddtrace/contrib/requests/patch.py b/ddtrace/contrib/requests/patch.py index ccc127aaf7c..30b95c9efb4 100644 --- a/ddtrace/contrib/requests/patch.py +++ b/ddtrace/contrib/requests/patch.py @@ -13,7 +13,7 @@ from ...ext import AppTypes # requests default settings -config._add('requests',{ +config._add('requests', { 'service_name': get_env('requests', 'service_name', DEFAULT_SERVICE), 'distributed_tracing': asbool(get_env('requests', 'distributed_tracing', False)), 'split_by_domain': asbool(get_env('requests', 'split_by_domain', False)), diff --git a/ddtrace/contrib/sqlalchemy/engine.py b/ddtrace/contrib/sqlalchemy/engine.py index 2697e551ae0..ceff9493eef 100644 --- a/ddtrace/contrib/sqlalchemy/engine.py +++ b/ddtrace/contrib/sqlalchemy/engine.py @@ -122,6 +122,7 @@ def _dbapi_error(self, conn, cursor, statement, *args): finally: span.finish() + def _set_tags_from_url(span, url): """ set connection tags from the url. return true if successful. """ if url.host: @@ -133,6 +134,7 @@ def _set_tags_from_url(span, url): return bool(span.get_tag(netx.TARGET_HOST)) + def _set_tags_from_cursor(span, vendor, cursor): """ attempt to set db connection tags by introspecting the cursor. """ if 'postgres' == vendor: @@ -143,4 +145,3 @@ def _set_tags_from_cursor(span, vendor, cursor): span.set_tag(sqlx.DB, d.get("dbname")) span.set_tag(netx.TARGET_HOST, d.get("host")) span.set_tag(netx.TARGET_PORT, d.get("port")) - diff --git a/ddtrace/contrib/sqlite3/patch.py b/ddtrace/contrib/sqlite3/patch.py index afed6c218fa..29f01d6836c 100644 --- a/ddtrace/contrib/sqlite3/patch.py +++ b/ddtrace/contrib/sqlite3/patch.py @@ -12,25 +12,30 @@ # Original connect method _connect = sqlite3.connect + def patch(): wrapped = wrapt.FunctionWrapper(_connect, traced_connect) setattr(sqlite3, 'connect', wrapped) setattr(sqlite3.dbapi2, 'connect', wrapped) + def unpatch(): sqlite3.connect = _connect sqlite3.dbapi2.connect = _connect + def traced_connect(func, _, args, kwargs): conn = func(*args, **kwargs) return patch_conn(conn) + def patch_conn(conn): wrapped = TracedSQLite(conn) Pin(service="sqlite", app="sqlite", app_type=AppTypes.db).onto(wrapped) return wrapped + class TracedSQLite(TracedConnection): def execute(self, *args, **kwargs): diff --git a/ddtrace/encoding.py b/ddtrace/encoding.py index 51779334312..7d96260c758 100644 --- a/ddtrace/encoding.py +++ b/ddtrace/encoding.py @@ -11,7 +11,7 @@ from msgpack._unpacker import unpack, unpackb, Unpacker # noqa from msgpack._version import version # use_bin_type kwarg only exists since msgpack-python v0.4.0 - MSGPACK_PARAMS = { 'use_bin_type': True } if version >= (0, 4, 0) else {} + MSGPACK_PARAMS = {'use_bin_type': True} if version >= (0, 4, 0) else {} MSGPACK_ENCODING = True except ImportError: # fallback to JSON @@ -80,6 +80,7 @@ def __init__(self): def _encode(self, obj): return msgpack.packb(obj, **MSGPACK_PARAMS) + def get_encoder(): """ Switching logic that choose the best encoder for the API transport. diff --git a/ddtrace/ext/apps.py b/ddtrace/ext/apps.py index 8b137891791..e69de29bb2d 100644 --- a/ddtrace/ext/apps.py +++ b/ddtrace/ext/apps.py @@ -1 +0,0 @@ - diff --git a/ddtrace/ext/errors.py b/ddtrace/ext/errors.py index 66e63184439..e8527b9d469 100644 --- a/ddtrace/ext/errors.py +++ b/ddtrace/ext/errors.py @@ -5,15 +5,16 @@ import traceback -ERROR_MSG = "error.msg" # a string representing the error message -ERROR_TYPE = "error.type" # a string representing the type of the error -ERROR_STACK = "error.stack" # a human readable version of the stack. beta. +ERROR_MSG = "error.msg" # a string representing the error message +ERROR_TYPE = "error.type" # a string representing the type of the error +ERROR_STACK = "error.stack" # a human readable version of the stack. beta. # shorthand for -----^ MSG = ERROR_MSG TYPE = ERROR_TYPE STACK = ERROR_STACK + def get_traceback(tb=None, error=None): t = None if error: diff --git a/ddtrace/ext/http.py b/ddtrace/ext/http.py index fb4a7a89b82..5a19851f075 100644 --- a/ddtrace/ext/http.py +++ b/ddtrace/ext/http.py @@ -18,5 +18,6 @@ # template render span type TEMPLATE = 'template' + def normalize_status_code(code): return code.split(' ')[0] diff --git a/ddtrace/ext/mongo.py b/ddtrace/ext/mongo.py index 9815dbc7a5d..5a536d3cd57 100644 --- a/ddtrace/ext/mongo.py +++ b/ddtrace/ext/mongo.py @@ -1,8 +1,6 @@ - TYPE = 'mongodb' COLLECTION = 'mongodb.collection' DB = 'mongodb.db' ROWS = 'mongodb.rows' QUERY = 'mongodb.query' - diff --git a/ddtrace/ext/sql.py b/ddtrace/ext/sql.py index 07cec1cfbee..95e325fe280 100644 --- a/ddtrace/ext/sql.py +++ b/ddtrace/ext/sql.py @@ -1,4 +1,3 @@ - from ddtrace.ext import AppTypes @@ -23,6 +22,7 @@ def normalize_vendor(vendor): else: return vendor + def parse_pg_dsn(dsn): """ Return a dictionary of the components of a postgres DSN. diff --git a/ddtrace/filters.py b/ddtrace/filters.py index 48be4eecee6..b6abef2e5f1 100644 --- a/ddtrace/filters.py +++ b/ddtrace/filters.py @@ -2,6 +2,7 @@ from .ext import http + class FilterRequestsOnUrl(object): """Filter out traces from incoming http requests based on the request's url. This class takes as argument a list of regular expression patterns diff --git a/ddtrace/monkey.py b/ddtrace/monkey.py index 29bfc15c84c..6eef0a8d9b3 100644 --- a/ddtrace/monkey.py +++ b/ddtrace/monkey.py @@ -99,6 +99,7 @@ def patch_all(**patch_modules): patch(raise_errors=False, **modules) + def patch(raise_errors=True, **patch_modules): """Patch only a set of given modules. @@ -125,10 +126,12 @@ def patch(raise_errors=True, **patch_modules): patch_module(module, raise_errors=raise_errors) patched_modules = get_patched_modules() - log.info("patched %s/%s modules (%s)", + log.info( + "patched %s/%s modules (%s)", len(patched_modules), len(modules), - ",".join(patched_modules)) + ",".join(patched_modules), + ) def patch_module(module, raise_errors=True): @@ -144,11 +147,13 @@ def patch_module(module, raise_errors=True): log.debug("failed to patch %s: %s", module, exc) return False + def get_patched_modules(): """Get the list of patched modules""" with _LOCK: return sorted(_PATCHED_MODULES) + def _patch_module(module): """_patch_module will attempt to monkey patch the module. diff --git a/ddtrace/opentracer/helpers.py b/ddtrace/opentracer/helpers.py index ff12887117a..f088c3f791a 100644 --- a/ddtrace/opentracer/helpers.py +++ b/ddtrace/opentracer/helpers.py @@ -5,6 +5,7 @@ Helper routines for Datadog OpenTracing. """ + def set_global_tracer(tracer): """Sets the global tracers to the given tracer.""" diff --git a/ddtrace/opentracer/propagation/propagator.py b/ddtrace/opentracer/propagation/propagator.py index 361e4dd573d..b7f7cda8992 100644 --- a/ddtrace/opentracer/propagation/propagator.py +++ b/ddtrace/opentracer/propagation/propagator.py @@ -3,6 +3,7 @@ # ref: https://stackoverflow.com/a/38668373 ABC = ABCMeta('ABC', (object,), {'__slots__': ()}) + class Propagator(ABC): @abstractmethod diff --git a/ddtrace/sampler.py b/ddtrace/sampler.py index c37eefd5baa..157d72aeb6d 100644 --- a/ddtrace/sampler.py +++ b/ddtrace/sampler.py @@ -15,6 +15,7 @@ # Has to be the same factor and key as the Agent to allow chained sampling KNUTH_FACTOR = 1111111111111111111 + class AllSampler(object): """Sampler sampling all the traces""" @@ -58,6 +59,7 @@ def _key(service=None, env=None): _default_key = _key() + class RateByServiceSampler(object): """Sampler based on a rate, by service diff --git a/ddtrace/span.py b/ddtrace/span.py index d0ce098fea4..1ece6e99af8 100644 --- a/ddtrace/span.py +++ b/ddtrace/span.py @@ -189,12 +189,12 @@ def get_metric(self, key): def to_dict(self): d = { - 'trace_id' : self.trace_id, - 'parent_id' : self.parent_id, - 'span_id' : self.span_id, + 'trace_id': self.trace_id, + 'parent_id': self.parent_id, + 'span_id': self.span_id, 'service': self.service, - 'resource' : self.resource, - 'name' : self.name, + 'resource': self.resource, + 'name': self.name, 'error': self.error, } @@ -237,7 +237,7 @@ def set_traceback(self, limit=20): def set_exc_info(self, exc_type, exc_val, exc_tb): """ Tag the span with an error tuple as from `sys.exc_info()`. """ if not (exc_type and exc_val and exc_tb): - return # nothing to do + return # nothing to do self.error = 1 @@ -311,6 +311,7 @@ def __repr__(self): self.name, ) + def _new_id(): """Generate a random trace_id or span_id""" return random.getrandbits(64) diff --git a/ddtrace/tracer.py b/ddtrace/tracer.py index 204d5cc1185..c9b90bf6d86 100644 --- a/ddtrace/tracer.py +++ b/ddtrace/tracer.py @@ -356,7 +356,7 @@ def set_service_info(self, service, app, app_type): # translate to the form the server understands. services = {} for service, app, app_type in self._services.values(): - services[service] = {"app" : app, "app_type" : app_type} + services[service] = {"app": app, "app_type": app_type} # queue them for writes. self.writer.write(services=services) diff --git a/ddtrace/utils/__init__.py b/ddtrace/utils/__init__.py index 5ce8a01aa57..420f72ee644 100644 --- a/ddtrace/utils/__init__.py +++ b/ddtrace/utils/__init__.py @@ -1,4 +1,3 @@ - # https://stackoverflow.com/a/26853961 def merge_dicts(x, y): """Returns a copy of y merged into x.""" diff --git a/ddtrace/utils/wrappers.py b/ddtrace/utils/wrappers.py index ce543800069..2e40cf480cd 100644 --- a/ddtrace/utils/wrappers.py +++ b/ddtrace/utils/wrappers.py @@ -62,4 +62,3 @@ def _get_original_method(thing, key): setattr(patchable, key, dest) elif hasattr(patchable, '__class__'): setattr(patchable, key, dest.__get__(patchable, patchable.__class__)) - diff --git a/ddtrace/writer.py b/ddtrace/writer.py index a248a466af2..f8fe26ec735 100644 --- a/ddtrace/writer.py +++ b/ddtrace/writer.py @@ -1,4 +1,3 @@ - # stdlib import atexit import logging @@ -116,8 +115,11 @@ def _on_shutdown(self): size = self._trace_queue.size() if size: key = "ctrl-break" if os.name == 'nt' else 'ctrl-c' - log.debug("Waiting %ss for traces to be sent. Hit %s to quit.", - self._shutdown_timeout, key) + log.debug( + "Waiting %ss for traces to be sent. Hit %s to quit.", + self._shutdown_timeout, + key, + ) timeout = time.time() + self._shutdown_timeout while time.time() < timeout and self._trace_queue.size(): # FIXME[matt] replace with a queue join diff --git a/tox.ini b/tox.ini index 76c165df15c..67797da38ea 100644 --- a/tox.ini +++ b/tox.ini @@ -610,7 +610,5 @@ setenv = [flake8] -ignore=W391,E231,E201,E202,E203,E261,E302,E128,E126,E124,W503 - max-line-length=120 exclude=tests From ca016010017c607ef1b7d981ec24952e0449abbc Mon Sep 17 00:00:00 2001 From: Kyle Verhoog Date: Mon, 3 Dec 2018 17:47:09 +0100 Subject: [PATCH 09/11] 0.17.1 version bump (#753) --- ddtrace/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/__init__.py b/ddtrace/__init__.py index ccdec1a3891..cec9fda339f 100644 --- a/ddtrace/__init__.py +++ b/ddtrace/__init__.py @@ -4,7 +4,7 @@ from .tracer import Tracer from .settings import config -__version__ = '0.17.0' +__version__ = '0.17.1' # a global tracer instance with integration settings tracer = Tracer() From cb245cdf1e443945fbbabfa008d733ee2f489d46 Mon Sep 17 00:00:00 2001 From: Kyle Verhoog Date: Mon, 3 Dec 2018 20:46:19 +0100 Subject: [PATCH 10/11] [cassandra] Fix batched query leak (#714) * [cassandra] use query as resource for batched queries * [cassandra] update batchstatement test * [cassandra] remove query tag --- ddtrace/contrib/cassandra/session.py | 4 +--- tests/contrib/cassandra/test.py | 10 ++++++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/ddtrace/contrib/cassandra/session.py b/ddtrace/contrib/cassandra/session.py index d00c7fa120a..e407b4a00d4 100644 --- a/ddtrace/contrib/cassandra/session.py +++ b/ddtrace/contrib/cassandra/session.py @@ -253,9 +253,7 @@ def _sanitize_query(span, query): # reset query if a string is available resource = getattr(query, "query_string", query) elif t == 'BatchStatement': - resource = 'BatchStatement' - q = "; ".join(q[1] for q in query._statements_and_parameters[:2]) - span.set_tag("cassandra.query", q) + resource = '; '.join(q[1] for q in query._statements_and_parameters[:2]) span.set_metric("cassandra.batch_size", len(query._statements_and_parameters)) elif t == 'BoundStatement': ps = getattr(query, 'prepared_statement', None) diff --git a/tests/contrib/cassandra/test.py b/tests/contrib/cassandra/test.py index a92519218da..f65d26e2dab 100644 --- a/tests/contrib/cassandra/test.py +++ b/tests/contrib/cassandra/test.py @@ -264,9 +264,15 @@ def test_batch_statement(self): spans = writer.pop() eq_(len(spans), 1) s = spans[0] - eq_(s.resource, 'BatchStatement') + eq_( + s.resource, + ( + 'INSERT INTO test.person_write (name, age, description) VALUES (\'Joe\', 1, \'a\'); ' + 'INSERT INTO test.person_write (name, age, description) VALUES (\'Jane\', 2, \'b\')' + ) + ) eq_(s.get_metric('cassandra.batch_size'), 2) - assert 'test.person' in s.get_tag('cassandra.query') + assert s.get_tag('cassandra.query') is None class TestCassPatchDefault(CassandraBase): From 74f5fb7ff00ea5b5dd511461e456a46641bb2f55 Mon Sep 17 00:00:00 2001 From: Kyle Verhoog Date: Wed, 5 Dec 2018 19:54:37 +0100 Subject: [PATCH 11/11] Revert "[cassandra] Fix batched query leak (#714)" (#765) This reverts commit cb245cdf1e443945fbbabfa008d733ee2f489d46. --- ddtrace/contrib/cassandra/session.py | 4 +++- tests/contrib/cassandra/test.py | 10 ++-------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/ddtrace/contrib/cassandra/session.py b/ddtrace/contrib/cassandra/session.py index e407b4a00d4..d00c7fa120a 100644 --- a/ddtrace/contrib/cassandra/session.py +++ b/ddtrace/contrib/cassandra/session.py @@ -253,7 +253,9 @@ def _sanitize_query(span, query): # reset query if a string is available resource = getattr(query, "query_string", query) elif t == 'BatchStatement': - resource = '; '.join(q[1] for q in query._statements_and_parameters[:2]) + resource = 'BatchStatement' + q = "; ".join(q[1] for q in query._statements_and_parameters[:2]) + span.set_tag("cassandra.query", q) span.set_metric("cassandra.batch_size", len(query._statements_and_parameters)) elif t == 'BoundStatement': ps = getattr(query, 'prepared_statement', None) diff --git a/tests/contrib/cassandra/test.py b/tests/contrib/cassandra/test.py index f65d26e2dab..a92519218da 100644 --- a/tests/contrib/cassandra/test.py +++ b/tests/contrib/cassandra/test.py @@ -264,15 +264,9 @@ def test_batch_statement(self): spans = writer.pop() eq_(len(spans), 1) s = spans[0] - eq_( - s.resource, - ( - 'INSERT INTO test.person_write (name, age, description) VALUES (\'Joe\', 1, \'a\'); ' - 'INSERT INTO test.person_write (name, age, description) VALUES (\'Jane\', 2, \'b\')' - ) - ) + eq_(s.resource, 'BatchStatement') eq_(s.get_metric('cassandra.batch_size'), 2) - assert s.get_tag('cassandra.query') is None + assert 'test.person' in s.get_tag('cassandra.query') class TestCassPatchDefault(CassandraBase):