From 69d1ea3e38cae6113ff6448b70503463ccfcc5ac Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Fri, 22 Nov 2024 16:13:56 -0800 Subject: [PATCH] Fix psycopg2 instrument_connection --- .../instrumentation/psycopg2/__init__.py | 64 ++++++++++++------- 1 file changed, 42 insertions(+), 22 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py b/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py index de2e49f4c3..152560a550 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py @@ -106,18 +106,19 @@ from typing import Collection import psycopg2 +import wrapt from psycopg2.extensions import ( cursor as pg_cursor, # pylint: disable=no-name-in-module ) from psycopg2.sql import Composed # pylint: disable=no-name-in-module +from opentelemetry import trace as trace_api from opentelemetry.instrumentation import dbapi from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.psycopg2.package import _instruments from opentelemetry.instrumentation.psycopg2.version import __version__ _logger = logging.getLogger(__name__) -_OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory" class Psycopg2Instrumentor(BaseInstrumentor): @@ -130,6 +131,8 @@ class Psycopg2Instrumentor(BaseInstrumentor): _DATABASE_SYSTEM = "postgresql" + _otel_cursor_factory = None + def instrumentation_dependencies(self) -> Collection[str]: return _instruments @@ -158,32 +161,40 @@ def _uninstrument(self, **kwargs): dbapi.unwrap_connect(psycopg2, "connect") # TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql - @staticmethod - def instrument_connection(connection, tracer_provider=None): - if not hasattr(connection, "_is_instrumented_by_opentelemetry"): - connection._is_instrumented_by_opentelemetry = False - - if not connection._is_instrumented_by_opentelemetry: - setattr( - connection, _OTEL_CURSOR_FACTORY_KEY, connection.cursor_factory - ) - connection.cursor_factory = _new_cursor_factory( - tracer_provider=tracer_provider - ) - connection._is_instrumented_by_opentelemetry = True - else: + def instrument_connection( + self, + connection, + tracer_provider: typing.Optional[trace_api.TracerProvider] = None, + enable_commenter: bool = False, + commenter_options: dict = None, + ): + if isinstance(connection, wrapt.ObjectProxy): + # The connection is already instrumented from wrapt.wrap_function_wrapper + # of the psycopg2 module's `connect` method by DB-API `wrap_connect`, + # so the Psycopg2Instrumentor is marked as instrumenting. _logger.warning( "Attempting to instrument Psycopg connection while already instrumented" ) + self._is_instrumented_by_opentelemetry = True + return connection + + # Save cursor_factory at instrumentor level because + # psycopg2 connection type does not allow arbitrary attrs + self._otel_cursor_factory = connection.cursor_factory + connection.cursor_factory = _new_cursor_factory( + base_factory=connection.cursor_factory, + tracer_provider=tracer_provider, + enable_commenter=enable_commenter, + commenter_options=commenter_options, + ) + self._is_instrumented_by_opentelemetry = True + return connection # TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql - @staticmethod - def uninstrument_connection(connection): - connection.cursor_factory = getattr( - connection, _OTEL_CURSOR_FACTORY_KEY, None - ) - + def uninstrument_connection(self, connection): + self._is_instrumented_by_opentelemetry = False + connection.cursor_factory = self._otel_cursor_factory return connection @@ -231,7 +242,13 @@ def get_statement(self, cursor, args): return statement -def _new_cursor_factory(db_api=None, base_factory=None, tracer_provider=None): +def _new_cursor_factory( + db_api: dbapi.DatabaseApiIntegration = None, + base_factory: pg_cursor = None, + tracer_provider: typing.Optional[trace_api.TracerProvider] = None, + enable_commenter: bool = False, + commenter_options: dict = None, +): if not db_api: db_api = DatabaseApiIntegration( __name__, @@ -239,6 +256,9 @@ def _new_cursor_factory(db_api=None, base_factory=None, tracer_provider=None): connection_attributes=Psycopg2Instrumentor._CONNECTION_ATTRIBUTES, version=__version__, tracer_provider=tracer_provider, + enable_commenter=enable_commenter, + commenter_options=commenter_options, + connect_module=psycopg2, ) base_factory = base_factory or pg_cursor