diff --git a/docs/nitpick-exceptions.ini b/docs/nitpick-exceptions.ini index b1fcdd5342..bf120765c9 100644 --- a/docs/nitpick-exceptions.ini +++ b/docs/nitpick-exceptions.ini @@ -41,6 +41,8 @@ py-class= callable Consumer confluent_kafka.Message + psycopg.Connection + psycopg.AsyncConnection ObjectProxy any= diff --git a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py index 27bb3d639d..ba4e5b4fd4 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py @@ -230,15 +230,13 @@ async def async_response_hook(span, request, response): NETWORK_PEER_ADDRESS, NETWORK_PEER_PORT, ) -from opentelemetry.trace import SpanKind, TracerProvider, get_tracer +from opentelemetry.trace import SpanKind, Tracer, TracerProvider, get_tracer from opentelemetry.trace.span import Span from opentelemetry.trace.status import StatusCode from opentelemetry.util.http import remove_url_credentials, sanitize_method _logger = logging.getLogger(__name__) -URL = typing.Tuple[bytes, bytes, typing.Optional[int], bytes] -Headers = typing.List[typing.Tuple[bytes, bytes]] RequestHook = typing.Callable[[Span, "RequestInfo"], None] ResponseHook = typing.Callable[[Span, "RequestInfo", "ResponseInfo"], None] AsyncRequestHook = typing.Callable[ @@ -253,17 +251,15 @@ class RequestInfo(typing.NamedTuple): method: bytes url: httpx.URL headers: httpx.Headers | None - stream: typing.Optional[ - typing.Union[httpx.SyncByteStream, httpx.AsyncByteStream] - ] - extensions: typing.Optional[dict] + stream: httpx.SyncByteStream | httpx.AsyncByteStream | None + extensions: dict[str, typing.Any] | None class ResponseInfo(typing.NamedTuple): status_code: int headers: httpx.Headers | None - stream: typing.Iterable[bytes] - extensions: typing.Optional[dict] + stream: httpx.SyncByteStream | httpx.AsyncByteStream + extensions: dict[str, typing.Any] | None def _get_default_span_name(method: str) -> str: @@ -274,11 +270,19 @@ def _get_default_span_name(method: str) -> str: return method -def _prepare_headers(headers: typing.Optional[Headers]) -> httpx.Headers: +def _prepare_headers(headers: httpx.Headers | None) -> httpx.Headers: return httpx.Headers(headers) -def _extract_parameters(args, kwargs): +def _extract_parameters( + args: tuple[typing.Any, ...], kwargs: dict[str, typing.Any] +) -> tuple[ + bytes, + httpx.URL, + httpx.Headers | None, + httpx.SyncByteStream | httpx.AsyncByteStream | None, + dict[str, typing.Any], +]: if isinstance(args[0], httpx.Request): # In httpx >= 0.20.0, handle_request receives a Request object request: httpx.Request = args[0] @@ -311,10 +315,15 @@ def _inject_propagation_headers(headers, args, kwargs): def _extract_response( - response: typing.Union[ - httpx.Response, typing.Tuple[int, Headers, httpx.SyncByteStream, dict] - ], -) -> typing.Tuple[int, Headers, httpx.SyncByteStream, dict, str]: + response: httpx.Response + | tuple[int, httpx.Headers, httpx.SyncByteStream, dict[str, typing.Any]], +) -> tuple[ + int, + httpx.Headers, + httpx.SyncByteStream | httpx.AsyncByteStream, + dict[str, typing.Any], + str, +]: if isinstance(response, httpx.Response): status_code = response.status_code headers = response.headers @@ -331,8 +340,8 @@ def _extract_response( def _apply_request_client_attributes_to_span( - span_attributes: dict, - url: typing.Union[str, URL, httpx.URL], + span_attributes: dict[str, typing.Any], + url: str | httpx.URL, method_original: str, semconv: _StabilityMode, ): @@ -407,9 +416,9 @@ class SyncOpenTelemetryTransport(httpx.BaseTransport): def __init__( self, transport: httpx.BaseTransport, - tracer_provider: typing.Optional[TracerProvider] = None, - request_hook: typing.Optional[RequestHook] = None, - response_hook: typing.Optional[ResponseHook] = None, + tracer_provider: TracerProvider | None = None, + request_hook: RequestHook | None = None, + response_hook: ResponseHook | None = None, ): _OpenTelemetrySemanticConventionStability._initialize() self._sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( @@ -426,27 +435,27 @@ def __init__( self._request_hook = request_hook self._response_hook = response_hook - def __enter__(self) -> "SyncOpenTelemetryTransport": + def __enter__(self) -> SyncOpenTelemetryTransport: self._transport.__enter__() return self def __exit__( self, - exc_type: typing.Optional[typing.Type[BaseException]] = None, - exc_value: typing.Optional[BaseException] = None, - traceback: typing.Optional[TracebackType] = None, + exc_type: type[BaseException] | None = None, + exc_value: BaseException | None = None, + traceback: TracebackType | None = None, ) -> None: self._transport.__exit__(exc_type, exc_value, traceback) # pylint: disable=R0914 def handle_request( self, - *args, - **kwargs, - ) -> typing.Union[ - typing.Tuple[int, "Headers", httpx.SyncByteStream, dict], - httpx.Response, - ]: + *args: typing.Any, + **kwargs: typing.Any, + ) -> ( + tuple[int, httpx.Headers, httpx.SyncByteStream, dict[str, typing.Any]] + | httpx.Response + ): """Add request info to span.""" if not is_http_instrumentation_enabled(): return self._transport.handle_request(*args, **kwargs) @@ -532,9 +541,9 @@ class AsyncOpenTelemetryTransport(httpx.AsyncBaseTransport): def __init__( self, transport: httpx.AsyncBaseTransport, - tracer_provider: typing.Optional[TracerProvider] = None, - request_hook: typing.Optional[AsyncRequestHook] = None, - response_hook: typing.Optional[AsyncResponseHook] = None, + tracer_provider: TracerProvider | None = None, + request_hook: AsyncRequestHook | None = None, + response_hook: AsyncResponseHook | None = None, ): _OpenTelemetrySemanticConventionStability._initialize() self._sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( @@ -557,19 +566,19 @@ async def __aenter__(self) -> "AsyncOpenTelemetryTransport": async def __aexit__( self, - exc_type: typing.Optional[typing.Type[BaseException]] = None, - exc_value: typing.Optional[BaseException] = None, - traceback: typing.Optional[TracebackType] = None, + exc_type: typing.Type[BaseException] | None = None, + exc_value: BaseException | None = None, + traceback: TracebackType | None = None, ) -> None: await self._transport.__aexit__(exc_type, exc_value, traceback) # pylint: disable=R0914 async def handle_async_request( - self, *args, **kwargs - ) -> typing.Union[ - typing.Tuple[int, "Headers", httpx.AsyncByteStream, dict], - httpx.Response, - ]: + self, *args: typing.Any, **kwargs: typing.Any + ) -> ( + tuple[int, httpx.Headers, httpx.AsyncByteStream, dict[str, typing.Any]] + | httpx.Response + ): """Add request info to span.""" if not is_http_instrumentation_enabled(): return await self._transport.handle_async_request(*args, **kwargs) @@ -653,7 +662,7 @@ class HTTPXClientInstrumentor(BaseInstrumentor): def instrumentation_dependencies(self) -> typing.Collection[str]: return _instruments - def _instrument(self, **kwargs): + def _instrument(self, **kwargs: typing.Any): """Instruments httpx Client and AsyncClient Args: @@ -716,20 +725,20 @@ def _instrument(self, **kwargs): ), ) - def _uninstrument(self, **kwargs): + def _uninstrument(self, **kwargs: typing.Any): unwrap(httpx.HTTPTransport, "handle_request") unwrap(httpx.AsyncHTTPTransport, "handle_async_request") @staticmethod def _handle_request_wrapper( # pylint: disable=too-many-locals - wrapped, - instance, - args, - kwargs, - tracer, - sem_conv_opt_in_mode, - request_hook, - response_hook, + wrapped: typing.Callable[..., typing.Any], + instance: httpx.HTTPTransport, + args: tuple[typing.Any, ...], + kwargs: dict[str, typing.Any], + tracer: Tracer, + sem_conv_opt_in_mode: _StabilityMode, + request_hook: RequestHook, + response_hook: ResponseHook, ): if not is_http_instrumentation_enabled(): return wrapped(*args, **kwargs) @@ -796,14 +805,14 @@ def _handle_request_wrapper( # pylint: disable=too-many-locals @staticmethod async def _handle_async_request_wrapper( # pylint: disable=too-many-locals - wrapped, - instance, - args, - kwargs, - tracer, - sem_conv_opt_in_mode, - async_request_hook, - async_response_hook, + wrapped: typing.Callable[..., typing.Awaitable[typing.Any]], + instance: httpx.AsyncHTTPTransport, + args: tuple[typing.Any, ...], + kwargs: dict[str, typing.Any], + tracer: Tracer, + sem_conv_opt_in_mode: _StabilityMode, + async_request_hook: AsyncRequestHook, + async_response_hook: AsyncResponseHook, ): if not is_http_instrumentation_enabled(): return await wrapped(*args, **kwargs) @@ -872,14 +881,10 @@ async def _handle_async_request_wrapper( # pylint: disable=too-many-locals @classmethod def instrument_client( cls, - client: typing.Union[httpx.Client, httpx.AsyncClient], - tracer_provider: TracerProvider = None, - request_hook: typing.Union[ - typing.Optional[RequestHook], typing.Optional[AsyncRequestHook] - ] = None, - response_hook: typing.Union[ - typing.Optional[ResponseHook], typing.Optional[AsyncResponseHook] - ] = None, + client: httpx.Client | httpx.AsyncClient, + tracer_provider: TracerProvider | None = None, + request_hook: RequestHook | AsyncRequestHook | None = None, + response_hook: ResponseHook | AsyncResponseHook | None = None, ) -> None: """Instrument httpx Client or AsyncClient @@ -977,9 +982,7 @@ def instrument_client( client._is_instrumented_by_opentelemetry = True @staticmethod - def uninstrument_client( - client: typing.Union[httpx.Client, httpx.AsyncClient], - ): + def uninstrument_client(client: httpx.Client | httpx.AsyncClient) -> None: """Disables instrumentation for the given client instance Args: diff --git a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/py.typed b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/py.typed new file mode 100644 index 0000000000..e69de29bb2 diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py index 81390ed48f..38a6264c6d 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py @@ -137,27 +137,28 @@ --- """ +from __future__ import annotations + import logging -import typing -from typing import Collection +from typing import Any, Callable, Collection, TypeVar import psycopg # pylint: disable=import-self -from psycopg import ( - AsyncCursor as pg_async_cursor, # pylint: disable=import-self,no-name-in-module -) -from psycopg import ( - Cursor as pg_cursor, # pylint: disable=no-name-in-module,import-self -) from psycopg.sql import Composed # pylint: disable=no-name-in-module from opentelemetry.instrumentation import dbapi from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.psycopg.package import _instruments from opentelemetry.instrumentation.psycopg.version import __version__ +from opentelemetry.trace import TracerProvider _logger = logging.getLogger(__name__) _OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory" +ConnectionT = TypeVar( + "ConnectionT", psycopg.Connection, psycopg.AsyncConnection +) +CursorT = TypeVar("CursorT", psycopg.Cursor, psycopg.AsyncCursor) + class PsycopgInstrumentor(BaseInstrumentor): _CONNECTION_ATTRIBUTES = { @@ -172,7 +173,7 @@ class PsycopgInstrumentor(BaseInstrumentor): def instrumentation_dependencies(self) -> Collection[str]: return _instruments - def _instrument(self, **kwargs): + def _instrument(self, **kwargs: Any): """Integrate with PostgreSQL Psycopg library. Psycopg: http://initd.org/psycopg/ """ @@ -223,7 +224,7 @@ def _instrument(self, **kwargs): enable_attribute_commenter=enable_attribute_commenter, ) - def _uninstrument(self, **kwargs): + def _uninstrument(self, **kwargs: Any): """ "Disable Psycopg instrumentation""" dbapi.unwrap_connect(psycopg, "connect") # pylint: disable=no-member dbapi.unwrap_connect( @@ -237,7 +238,9 @@ def _uninstrument(self, **kwargs): # TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql @staticmethod - def instrument_connection(connection, tracer_provider=None): + def instrument_connection( + connection: ConnectionT, tracer_provider: TracerProvider | None = None + ) -> ConnectionT: """Enable instrumentation in a psycopg connection. Args: @@ -269,7 +272,7 @@ def instrument_connection(connection, tracer_provider=None): # TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql @staticmethod - def uninstrument_connection(connection): + def uninstrument_connection(connection: ConnectionT) -> ConnectionT: connection.cursor_factory = getattr( connection, _OTEL_CURSOR_FACTORY_KEY, None ) @@ -281,9 +284,9 @@ def uninstrument_connection(connection): class DatabaseApiIntegration(dbapi.DatabaseApiIntegration): def wrapped_connection( self, - connect_method: typing.Callable[..., typing.Any], - args: typing.Tuple[typing.Any, typing.Any], - kwargs: typing.Dict[typing.Any, typing.Any], + connect_method: Callable[..., Any], + args: tuple[Any, Any], + kwargs: dict[Any, Any], ): """Add object proxy to connection object.""" base_cursor_factory = kwargs.pop("cursor_factory", None) @@ -299,9 +302,9 @@ def wrapped_connection( class DatabaseApiAsyncIntegration(dbapi.DatabaseApiIntegration): async def wrapped_connection( self, - connect_method: typing.Callable[..., typing.Any], - args: typing.Tuple[typing.Any, typing.Any], - kwargs: typing.Dict[typing.Any, typing.Any], + connect_method: Callable[..., Any], + args: tuple[Any, Any], + kwargs: dict[Any, Any], ): """Add object proxy to connection object.""" base_cursor_factory = kwargs.pop("cursor_factory", None) @@ -317,7 +320,7 @@ async def wrapped_connection( class CursorTracer(dbapi.CursorTracer): - def get_operation_name(self, cursor, args): + def get_operation_name(self, cursor: CursorT, args: list[Any]) -> str: if not args: return "" @@ -332,7 +335,7 @@ def get_operation_name(self, cursor, args): return "" - def get_statement(self, cursor, args): + def get_statement(self, cursor: CursorT, args: list[Any]) -> str: if not args: return "" @@ -342,7 +345,11 @@ def get_statement(self, cursor, args): return statement -def _new_cursor_factory(db_api=None, base_factory=None, tracer_provider=None): +def _new_cursor_factory( + db_api: DatabaseApiIntegration | None = None, + base_factory: type[psycopg.Cursor] | None = None, + tracer_provider: TracerProvider | None = None, +): if not db_api: db_api = DatabaseApiIntegration( __name__, @@ -352,21 +359,21 @@ def _new_cursor_factory(db_api=None, base_factory=None, tracer_provider=None): tracer_provider=tracer_provider, ) - base_factory = base_factory or pg_cursor + base_factory = base_factory or psycopg.Cursor _cursor_tracer = CursorTracer(db_api) class TracedCursorFactory(base_factory): - def execute(self, *args, **kwargs): + def execute(self, *args: Any, **kwargs: Any): return _cursor_tracer.traced_execution( self, super().execute, *args, **kwargs ) - def executemany(self, *args, **kwargs): + def executemany(self, *args: Any, **kwargs: Any): return _cursor_tracer.traced_execution( self, super().executemany, *args, **kwargs ) - def callproc(self, *args, **kwargs): + def callproc(self, *args: Any, **kwargs: Any): return _cursor_tracer.traced_execution( self, super().callproc, *args, **kwargs ) @@ -375,7 +382,9 @@ def callproc(self, *args, **kwargs): def _new_cursor_async_factory( - db_api=None, base_factory=None, tracer_provider=None + db_api: DatabaseApiAsyncIntegration | None = None, + base_factory: type[psycopg.AsyncCursor] | None = None, + tracer_provider: TracerProvider | None = None, ): if not db_api: db_api = DatabaseApiAsyncIntegration( @@ -385,21 +394,21 @@ def _new_cursor_async_factory( version=__version__, tracer_provider=tracer_provider, ) - base_factory = base_factory or pg_async_cursor + base_factory = base_factory or psycopg.AsyncCursor _cursor_tracer = CursorTracer(db_api) class TracedCursorAsyncFactory(base_factory): - async def execute(self, *args, **kwargs): + async def execute(self, *args: Any, **kwargs: Any): return await _cursor_tracer.traced_execution( self, super().execute, *args, **kwargs ) - async def executemany(self, *args, **kwargs): + async def executemany(self, *args: Any, **kwargs: Any): return await _cursor_tracer.traced_execution( self, super().executemany, *args, **kwargs ) - async def callproc(self, *args, **kwargs): + async def callproc(self, *args: Any, **kwargs: Any): return await _cursor_tracer.traced_execution( self, super().callproc, *args, **kwargs ) diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/package.py b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/package.py index 635edfb4db..a3ee72d1ae 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/package.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/package.py @@ -11,6 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations - -_instruments = ("psycopg >= 3.1.0",) +_instruments: tuple[str, ...] = ("psycopg >= 3.1.0",) diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/py.typed b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/py.typed new file mode 100644 index 0000000000..e69de29bb2 diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py b/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py index 4ddaad9174..6c9bcf2d4b 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py @@ -132,10 +132,10 @@ class PostgresqlIntegrationTestMixin: def setUp(self): super().setUp() self.cursor_mock = mock.patch( - "opentelemetry.instrumentation.psycopg.pg_cursor", MockCursor + "opentelemetry.instrumentation.psycopg.psycopg.Cursor", MockCursor ) self.cursor_async_mock = mock.patch( - "opentelemetry.instrumentation.psycopg.pg_async_cursor", + "opentelemetry.instrumentation.psycopg.psycopg.AsyncCursor", MockAsyncCursor, ) self.connection_mock = mock.patch("psycopg.connect", MockConnection)