Skip to content

Commit

Permalink
metrics: scope http requests metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nosahama committed Jan 21, 2025
1 parent 1f6e0cf commit 618b21f
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 172 deletions.
4 changes: 2 additions & 2 deletions src/schema_registry/telemetry/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.semconv.attributes import telemetry_attributes as T
from schema_registry.telemetry.meter import Meter
from schema_registry.telemetry.metrics import Metrics
from schema_registry.telemetry.metrics import HTTPRequestMetrics
from schema_registry.telemetry.tracer import Tracer


Expand All @@ -32,6 +32,6 @@ class TelemetryContainer(containers.DeclarativeContainer):
telemetry_resource = providers.Factory(create_telemetry_resource, config=karapace_container.config)

meter = providers.Singleton(Meter)
metrics = providers.Singleton(Metrics, meter=meter)
http_request_metrics = providers.Singleton(HTTPRequestMetrics, meter=meter)
tracer = providers.Singleton(Tracer)
tracer_provider = providers.Singleton(TracerProvider, resource=telemetry_resource)
40 changes: 36 additions & 4 deletions src/schema_registry/telemetry/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,57 @@
See LICENSE for details
"""

from fastapi import Request, Response, HTTPException
from opentelemetry.metrics import Counter, Histogram, UpDownCounter
from schema_registry.telemetry.meter import Meter
from typing import Final
from collections.abc import Mapping

import time

class Metrics:

class HTTPRequestMetrics:
START_TIME_KEY: Final = "start_time"

def __init__(self, meter: Meter):
self.karapace_http_requests_in_progress: UpDownCounter = meter.get_meter().create_up_down_counter(
self.meter = meter
self.karapace_http_requests_in_progress: Final[UpDownCounter] = self.meter.get_meter().create_up_down_counter(
name="karapace_http_requests_in_progress",
description="In-progress requests for HTTP/TCP Protocol",
)
self.karapace_http_requests_duration_seconds: Histogram = meter.get_meter().create_histogram(
self.karapace_http_requests_duration_seconds: Final[Histogram] = self.meter.get_meter().create_histogram(
unit="seconds",
name="karapace_http_requests_duration_seconds",
description="Request Duration for HTTP/TCP Protocol",
)
self.karapace_http_requests_total: Counter = meter.get_meter().create_counter(
self.karapace_http_requests_total: Final[Counter] = self.meter.get_meter().create_counter(
name="karapace_http_requests_total",
description="Total Request Count for HTTP/TCP Protocol",
)

def get_resource_from_request(self, request: Request) -> str:
return request.url.path.split("/")[1]

def start_request(self, request: Request) -> Mapping[str, str | int]:
# Set start time for request
setattr(request.state, self.START_TIME_KEY, time.monotonic())

PATH = request.url.path
METHOD = request.method
ATTRIBUTES = {"method": METHOD, "path": PATH, "resource": self.get_resource_from_request(request=request)}

self.karapace_http_requests_in_progress.add(amount=1, attributes=ATTRIBUTES)
return ATTRIBUTES

def finish_request(self, ATTRIBUTES: Mapping[str, str | int], request: Request, response: Response | None) -> None:
status = response.status_code if response else 0
self.karapace_http_requests_total.add(amount=1, attributes={**ATTRIBUTES, "status": status})
self.karapace_http_requests_duration_seconds.record(
amount=(time.monotonic() - getattr(request.state, self.START_TIME_KEY)),
attributes=ATTRIBUTES,
)
self.karapace_http_requests_in_progress.add(amount=-1, attributes=ATTRIBUTES)

def record_request_exception(self, ATTRIBUTES: Mapping[str, str | int], exc: Exception) -> None:
status = exc.status_code if isinstance(exc, HTTPException) else 0
self.karapace_http_requests_total.add(amount=1, attributes={**ATTRIBUTES, "status": status})
38 changes: 9 additions & 29 deletions src/schema_registry/telemetry/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@

from collections.abc import Awaitable, Callable
from dependency_injector.wiring import inject, Provide
from fastapi import FastAPI, Request, Response, HTTPException
from fastapi import FastAPI, Request, Response
from opentelemetry.trace import SpanKind, Status, StatusCode
from schema_registry.telemetry.container import TelemetryContainer
from schema_registry.telemetry.metrics import Metrics
from schema_registry.telemetry.metrics import HTTPRequestMetrics
from schema_registry.telemetry.tracer import Tracer

import logging
import time

LOG = logging.getLogger(__name__)

Expand All @@ -22,47 +21,28 @@ async def telemetry_middleware(
request: Request,
call_next: Callable[[Request], Awaitable[Response]],
tracer: Tracer = Provide[TelemetryContainer.tracer],
metrics: Metrics = Provide[TelemetryContainer.metrics],
http_request_metrics: HTTPRequestMetrics = Provide[TelemetryContainer.http_request_metrics],
) -> Response | None:
RESOURCE = request.url.path.split("/")[1]
RESOURCE = http_request_metrics.get_resource_from_request(request=request)
with tracer.get_tracer().start_as_current_span(name=f"{request.method}: /{RESOURCE}", kind=SpanKind.SERVER) as SPAN:
SPAN.add_event("Creating metering resources")

# Set start time for request
setattr(request.state, metrics.START_TIME_KEY, time.monotonic())
ATTRIBUTES = http_request_metrics.start_request(request=request)
tracer.update_span_with_request(request=request, span=SPAN)

PATH = request.url.path
METHOD = request.method
ATTRIBUTES = {"method": METHOD, "path": PATH, "resource": RESOURCE}

SPAN.add_event("Metering requests in progress (increase)")
metrics.karapace_http_requests_in_progress.add(amount=1, attributes=ATTRIBUTES)

try:
SPAN.add_event("Calling request handler")
response: Response = await call_next(request)
SPAN.set_status(Status(StatusCode.OK))
except Exception as exc:
status = exc.status_code if isinstance(exc, HTTPException) else 0
SPAN.add_event("Metering total requests on exception")
metrics.karapace_http_requests_total.add(amount=1, attributes={**ATTRIBUTES, "status": status})
http_request_metrics.record_request_exception(ATTRIBUTES=ATTRIBUTES, exc=exc)
SPAN.set_status(Status(StatusCode.ERROR))
SPAN.record_exception(exc)
else:
SPAN.add_event("Metering total requests")
metrics.karapace_http_requests_total.add(amount=1, attributes={**ATTRIBUTES, "status": response.status_code})
SPAN.add_event("Update span with response details")
tracer.update_span_with_response(response=response, span=SPAN)
return response
finally:
SPAN.add_event("Metering request duration")
metrics.karapace_http_requests_duration_seconds.record(
amount=(time.monotonic() - getattr(request.state, metrics.START_TIME_KEY)),
attributes=ATTRIBUTES,
http_request_metrics.finish_request(
ATTRIBUTES=ATTRIBUTES, request=request, response=response if "response" in locals() else None
)
SPAN.add_event("Metering requests in progress (decrease)")
metrics.karapace_http_requests_in_progress.add(amount=-1, attributes=ATTRIBUTES)

return None


Expand Down
103 changes: 94 additions & 9 deletions tests/unit/schema_registry/telemetry/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,37 @@
See LICENSE for details
"""

from schema_registry.telemetry.metrics import Metrics
from fastapi import Request, Response, HTTPException
from schema_registry.telemetry.metrics import HTTPRequestMetrics
from schema_registry.telemetry.meter import Meter
from unittest.mock import call, MagicMock
from unittest.mock import AsyncMock, patch

import pytest

def test_metrics():

@pytest.fixture
def http_request_metrics() -> HTTPRequestMetrics:
meter = MagicMock(spec=Meter)
metrics = Metrics(meter=meter)
return HTTPRequestMetrics(meter=meter)


@pytest.fixture
def request_mock() -> AsyncMock:
request_mock = AsyncMock(spec=Request)
request_mock.method = "GET"
request_mock.url.path = "/test/inner-path"
return request_mock

assert hasattr(metrics, "karapace_http_requests_in_progress")
assert hasattr(metrics, "karapace_http_requests_duration_seconds")
assert hasattr(metrics, "karapace_http_requests_total")
assert hasattr(metrics, "START_TIME_KEY")
assert metrics.START_TIME_KEY == "start_time"

meter.assert_has_calls(
def test_http_request_metrics_objects(http_request_metrics: HTTPRequestMetrics):
assert hasattr(http_request_metrics, "karapace_http_requests_in_progress")
assert hasattr(http_request_metrics, "karapace_http_requests_duration_seconds")
assert hasattr(http_request_metrics, "karapace_http_requests_total")
assert hasattr(http_request_metrics, "START_TIME_KEY")
assert http_request_metrics.START_TIME_KEY == "start_time"

http_request_metrics.meter.assert_has_calls(
[
call.get_meter(),
call.get_meter().create_up_down_counter(
Expand All @@ -38,3 +53,73 @@ def test_metrics():
),
]
)


def test_get_resource_from_request(http_request_metrics: HTTPRequestMetrics, request_mock: AsyncMock) -> None:
assert http_request_metrics.get_resource_from_request(request=request_mock) == "test"


def test_start_request(http_request_metrics: HTTPRequestMetrics, request_mock: AsyncMock) -> None:
http_request_metrics.karapace_http_requests_in_progress = MagicMock()

with patch("schema_registry.telemetry.metrics.time.monotonic", return_value=1):
ATTRIBUTES = http_request_metrics.start_request(request=request_mock)
http_request_metrics.karapace_http_requests_in_progress.add.assert_called_with(amount=1, attributes=ATTRIBUTES)
assert ATTRIBUTES == {"method": "GET", "path": "/test/inner-path", "resource": "test"}


def test_finish_request(http_request_metrics: HTTPRequestMetrics, request_mock: AsyncMock) -> None:
ATTRIBUTES = {"method": "GET", "path": "/test/inner-path", "resource": "test"}
response_mock = AsyncMock(spec=Response)
response_mock.status_code = 200
http_request_metrics.karapace_http_requests_duration_seconds = MagicMock()
http_request_metrics.karapace_http_requests_in_progress = MagicMock()
http_request_metrics.karapace_http_requests_total = MagicMock()

with patch("schema_registry.telemetry.metrics.time.monotonic", return_value=3):
http_request_metrics.finish_request(ATTRIBUTES=ATTRIBUTES, request=request_mock, response=response_mock)
http_request_metrics.karapace_http_requests_duration_seconds.record.assert_called_with(
amount=3 - request_mock.state.start_time, attributes=ATTRIBUTES
)
http_request_metrics.karapace_http_requests_in_progress.add.assert_called_with(amount=-1, attributes=ATTRIBUTES)
http_request_metrics.karapace_http_requests_total.add.assert_called_with(
amount=1, attributes={**ATTRIBUTES, "status": 200}
)


def test_finish_request_without_response(http_request_metrics: HTTPRequestMetrics, request_mock: AsyncMock) -> None:
ATTRIBUTES = {"method": "GET", "path": "/test/inner-path", "resource": "test"}
http_request_metrics.karapace_http_requests_duration_seconds = MagicMock()
http_request_metrics.karapace_http_requests_in_progress = MagicMock()
http_request_metrics.karapace_http_requests_total = MagicMock()

with patch("schema_registry.telemetry.metrics.time.monotonic", return_value=3):
http_request_metrics.finish_request(ATTRIBUTES=ATTRIBUTES, request=request_mock, response=None)
http_request_metrics.karapace_http_requests_duration_seconds.record.assert_called_with(
amount=3 - request_mock.state.start_time, attributes=ATTRIBUTES
)
http_request_metrics.karapace_http_requests_in_progress.add.assert_called_with(amount=-1, attributes=ATTRIBUTES)
http_request_metrics.karapace_http_requests_total.add.assert_called_with(
amount=1, attributes={**ATTRIBUTES, "status": 0}
)


def test_record_request_exception_http_exception(http_request_metrics: HTTPRequestMetrics) -> None:
exception = HTTPException(status_code=404)
ATTRIBUTES = {"method": "GET", "path": "/test/inner-path", "resource": "test"}
http_request_metrics.karapace_http_requests_total = MagicMock()

http_request_metrics.record_request_exception(ATTRIBUTES=ATTRIBUTES, exc=exception)
http_request_metrics.karapace_http_requests_total.add.assert_called_with(
amount=1, attributes={**ATTRIBUTES, "status": 404}
)


def test_record_request_exception_uncaught_exception(http_request_metrics: HTTPRequestMetrics) -> None:
ATTRIBUTES = {"method": "GET", "path": "/test/inner-path", "resource": "test"}
http_request_metrics.karapace_http_requests_total = MagicMock()

http_request_metrics.record_request_exception(ATTRIBUTES=ATTRIBUTES, exc=Exception())
http_request_metrics.karapace_http_requests_total.add.assert_called_with(
amount=1, attributes={**ATTRIBUTES, "status": 0}
)
Loading

0 comments on commit 618b21f

Please sign in to comment.