Skip to content

Commit

Permalink
Merge pull request #950 from julep-ai/f/cozo-metrics-historgram
Browse files Browse the repository at this point in the history
fix: Fix and refactor metrics
  • Loading branch information
whiterabbit1983 authored Dec 12, 2024
2 parents 9258698 + f47cb92 commit 682da5a
Showing 1 changed file with 75 additions and 25 deletions.
100 changes: 75 additions & 25 deletions agents-api/agents_api/metrics/counters.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,87 @@
import inspect
import time
from functools import wraps
from typing import Awaitable, Callable, ParamSpec, TypeVar

from prometheus_client import Counter, Summary
from prometheus_client import Counter, Histogram, Summary
from prometheus_client.utils import INF

P = ParamSpec("P")
T = TypeVar("T")


def query_metrics_update(metric_label: str, id_field_name: str = "developer_id"):
id_field_name = "developer_id"
labelnames = ("developer_id", "query_name")
buckets = (
0.005,
0.01,
0.025,
0.05,
0.075,
0.1,
0.25,
0.5,
0.75,
1.0,
2.5,
5.0,
7.5,
10.0,
15.0,
20.0,
25.0,
30.0,
INF,
)
counter = Counter(
f"db_query_counter",
f"Number of db calls",
labelnames=labelnames,
)
summary = Summary(
f"db_query_latency_summary",
"Database query latency summary",
labelnames=labelnames,
)
hist = Histogram(
f"db_query_latency_hist",
"Database query latency histogram",
labelnames=labelnames,
buckets=buckets,
)


def query_metrics_update(metric_label: str):
def decor(func: Callable[P, T | Awaitable[T]]):
counter = Counter(
f"cozo_counter_{metric_label}",
f"Number of {metric_label} calls",
labelnames=(id_field_name,),
)
summary = Summary(
f"cozo_latency_{metric_label}",
metric_label,
)

@summary.time()
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
counter.labels(kwargs.get(id_field_name, "not_set")).inc()
return func(*args, **kwargs)

@summary.time()
@wraps(func)
async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
counter.labels(kwargs.get(id_field_name, "not_set")).inc()
return await func(*args, **kwargs)

return async_wrapper if inspect.iscoroutinefunction(func) else wrapper
if inspect.iscoroutinefunction(func):

@wraps(func)
async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
fld_id = kwargs.get(id_field_name, "not_set")
labels_dict = {id_field_name: fld_id, "query_name": metric_label}
counter.labels(**labels_dict).inc()
start_time = time.time()
result = await func(*args, **kwargs)
end_time = time.time() - start_time
summary.labels(**labels_dict).observe(end_time)
hist.labels(**labels_dict).observe(end_time)
return result

return async_wrapper
else:

@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
fld_id = kwargs.get(id_field_name, "not_set")
labels_dict = {id_field_name: fld_id, "query_name": metric_label}
counter.labels(**labels_dict).inc()
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time() - start_time
summary.labels(**labels_dict).observe(end_time)
hist.labels(**labels_dict).observe(end_time)
return result

return wrapper

return decor

0 comments on commit 682da5a

Please sign in to comment.