From 7986717fbd5cc923c2e0a1749d4e41ad168d8c4c Mon Sep 17 00:00:00 2001 From: Sai Medhini Reddy Maryada Date: Tue, 2 Apr 2024 23:06:00 -0700 Subject: [PATCH] Updated Signed-off-by: Sai Medhini Reddy Maryada --- opensearchpy/_async/client/client.py | 6 ++++-- opensearchpy/_async/http_aiohttp.py | 11 +++++++++-- opensearchpy/_async/transport.py | 6 ++++++ 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/opensearchpy/_async/client/client.py b/opensearchpy/_async/client/client.py index 091bb5e9..15031b82 100644 --- a/opensearchpy/_async/client/client.py +++ b/opensearchpy/_async/client/client.py @@ -11,7 +11,7 @@ from opensearchpy.client.utils import _normalize_hosts from opensearchpy.transport import Transport - +from opensearchpy.metrics.metrics import Metrics class Client(object): """ @@ -22,6 +22,7 @@ def __init__( self, hosts: Optional[str] = None, transport_class: Type[Transport] = Transport, + metrics: Optional[Metrics] = None, **kwargs: Any ) -> None: """ @@ -38,4 +39,5 @@ class as kwargs, or a string in the format of ``host[:port]`` which will be :class:`~opensearchpy.Transport` class and, subsequently, to the :class:`~opensearchpy.Connection` instances. """ - self.transport = transport_class(_normalize_hosts(hosts), **kwargs) + print("async opensearch printed") + self.transport = transport_class(_normalize_hosts(hosts), metrics=metrics, **kwargs) diff --git a/opensearchpy/_async/http_aiohttp.py b/opensearchpy/_async/http_aiohttp.py index c49fd574..1be66380 100644 --- a/opensearchpy/_async/http_aiohttp.py +++ b/opensearchpy/_async/http_aiohttp.py @@ -32,7 +32,7 @@ from typing import Any, Collection, Mapping, Optional, Union import urllib3 - +from opensearchpy.metrics.metrics import Metrics from ..compat import reraise_exceptions, urlencode from ..connection.base import Connection from ..exceptions import ( @@ -93,6 +93,7 @@ def __init__( opaque_id: Optional[str] = None, loop: Any = None, trust_env: Optional[bool] = False, + metrics: Optional[Metrics] = None, **kwargs: Any ) -> None: """ @@ -128,7 +129,8 @@ def __init__( For tracing all requests made by this transport. :arg loop: asyncio Event Loop to use with aiohttp. This is set by default to the currently running loop. """ - + self.metrics = metrics + print("printed metrics in aiohttp connection", self.metrics) self.headers = {} super().__init__( @@ -293,6 +295,8 @@ async def perform_request( start = self.loop.time() try: + if self.metrics is not None: + self.metrics.request_start() async with self.session.request( method, url, @@ -327,6 +331,9 @@ async def perform_request( ): raise ConnectionTimeout("TIMEOUT", str(e), e) raise ConnectionError("N/A", str(e), e) + finally: + if self.metrics is not None: + self.metrics.request_end() # raise warnings if any from the 'Warnings' header. warning_headers = response.headers.getall("warning", ()) diff --git a/opensearchpy/_async/transport.py b/opensearchpy/_async/transport.py index e8b17252..070e4bdc 100644 --- a/opensearchpy/_async/transport.py +++ b/opensearchpy/_async/transport.py @@ -45,6 +45,7 @@ from ..transport import Transport, get_host_info from .compat import get_running_loop from .http_aiohttp import AIOHttpConnection +from opensearchpy.metrics.metrics import Metrics logger = logging.getLogger("opensearch") @@ -60,6 +61,7 @@ class AsyncTransport(Transport): DEFAULT_CONNECTION_CLASS = AIOHttpConnection sniffing_task: Any = None + metrics: Optional[Metrics] def __init__( self, @@ -78,6 +80,7 @@ def __init__( retry_on_status: Any = (502, 503, 504), retry_on_timeout: bool = False, send_get_body_as: str = "GET", + metrics: Optional[Metrics] = None, **kwargs: Any ) -> None: """ @@ -117,6 +120,8 @@ def __init__( when creating and instance unless overridden by that connection's options provided as part of the hosts parameter. """ + self.metrics = metrics + print("printed metrics in transport", self.metrics) self.sniffing_task = None self.loop: Any = None self._async_init_called = False @@ -138,6 +143,7 @@ def __init__( retry_on_status=retry_on_status, retry_on_timeout=retry_on_timeout, send_get_body_as=send_get_body_as, + metrics=self.metrics, **kwargs )