Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for metrics APIs #1044

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions disperser/dataapi/docs/docs.go
Original file line number Diff line number Diff line change
@@ -529,6 +529,57 @@ const docTemplate = `{
}
}
},
"/metrics/summary": {
"get": {
"produces": [
"application/json"
],
"tags": [
"Metrics"
],
"summary": "Fetch metrics summary",
"parameters": [
{
"type": "integer",
"description": "Start unix timestamp [default: 1 hour ago]",
"name": "start",
"in": "query"
},
{
"type": "integer",
"description": "End unix timestamp [default: unix time now]",
"name": "end",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/dataapi.Metric"
}
},
"400": {
"description": "error: Bad request",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"404": {
"description": "error: Not found",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"500": {
"description": "error: Server error",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
}
}
}
},
"/metrics/throughput": {
"get": {
"produces": [
@@ -583,6 +634,60 @@ const docTemplate = `{
}
}
},
"/metrics/timeseries/throughput": {
"get": {
"produces": [
"application/json"
],
"tags": [
"Metrics"
],
"summary": "Fetch throughput time series",
"parameters": [
{
"type": "integer",
"description": "Start unix timestamp [default: 1 hour ago]",
"name": "start",
"in": "query"
},
{
"type": "integer",
"description": "End unix timestamp [default: unix time now]",
"name": "end",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/dataapi.Throughput"
}
}
},
"400": {
"description": "error: Bad request",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"404": {
"description": "error: Not found",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"500": {
"description": "error: Server error",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
}
}
}
},
"/operators-info/deregistered-operators": {
"get": {
"produces": [
105 changes: 105 additions & 0 deletions disperser/dataapi/docs/swagger.json
Original file line number Diff line number Diff line change
@@ -525,6 +525,57 @@
}
}
},
"/metrics/summary": {
"get": {
"produces": [
"application/json"
],
"tags": [
"Metrics"
],
"summary": "Fetch metrics summary",
"parameters": [
{
"type": "integer",
"description": "Start unix timestamp [default: 1 hour ago]",
"name": "start",
"in": "query"
},
{
"type": "integer",
"description": "End unix timestamp [default: unix time now]",
"name": "end",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/dataapi.Metric"
}
},
"400": {
"description": "error: Bad request",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"404": {
"description": "error: Not found",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"500": {
"description": "error: Server error",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
}
}
}
},
"/metrics/throughput": {
"get": {
"produces": [
@@ -579,6 +630,60 @@
}
}
},
"/metrics/timeseries/throughput": {
"get": {
"produces": [
"application/json"
],
"tags": [
"Metrics"
],
"summary": "Fetch throughput time series",
"parameters": [
{
"type": "integer",
"description": "Start unix timestamp [default: 1 hour ago]",
"name": "start",
"in": "query"
},
{
"type": "integer",
"description": "End unix timestamp [default: unix time now]",
"name": "end",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/dataapi.Throughput"
}
}
},
"400": {
"description": "error: Bad request",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"404": {
"description": "error: Not found",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
},
"500": {
"description": "error: Server error",
"schema": {
"$ref": "#/definitions/dataapi.ErrorResponse"
}
}
}
}
},
"/operators-info/deregistered-operators": {
"get": {
"produces": [
68 changes: 68 additions & 0 deletions disperser/dataapi/docs/swagger.yaml
Original file line number Diff line number Diff line change
@@ -697,6 +697,39 @@ paths:
summary: Fetch operators non signing percentage
tags:
- Metrics
/metrics/summary:
get:
parameters:
- description: 'Start unix timestamp [default: 1 hour ago]'
in: query
name: start
type: integer
- description: 'End unix timestamp [default: unix time now]'
in: query
name: end
type: integer
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/dataapi.Metric'
"400":
description: 'error: Bad request'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
"404":
description: 'error: Not found'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
"500":
description: 'error: Server error'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
summary: Fetch metrics summary
tags:
- Metrics
/metrics/throughput:
get:
parameters:
@@ -732,6 +765,41 @@ paths:
summary: Fetch throughput time series
tags:
- Metrics
/metrics/timeseries/throughput:
get:
parameters:
- description: 'Start unix timestamp [default: 1 hour ago]'
in: query
name: start
type: integer
- description: 'End unix timestamp [default: unix time now]'
in: query
name: end
type: integer
produces:
- application/json
responses:
"200":
description: OK
schema:
items:
$ref: '#/definitions/dataapi.Throughput'
type: array
"400":
description: 'error: Bad request'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
"404":
description: 'error: Not found'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
"500":
description: 'error: Server error'
schema:
$ref: '#/definitions/dataapi.ErrorResponse'
summary: Fetch throughput time series
tags:
- Metrics
/operators-info/deregistered-operators:
get:
produces:
99 changes: 92 additions & 7 deletions disperser/dataapi/server_v2.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"os"
"strconv"
"strings"
"time"

@@ -39,6 +40,10 @@ type (
SignedBatch *SignedBatch `json:"signed_batch"`
BlobVerificationInfos []*corev2.BlobVerificationInfo `json:"blob_verification_infos"`
}

MetricSummary struct {
AvgThroughput float64 `json:"avg_throughput"`
}
)

type ServerInterface interface {
@@ -61,6 +66,7 @@ type ServerV2 struct {
metrics *Metrics

operatorHandler *operatorHandler
metricsHandler *metricsHandler
}

func NewServerV2(
@@ -88,6 +94,7 @@ func NewServerV2(
indexedChainState: indexedChainState,
metrics: metrics,
operatorHandler: newOperatorHandler(l, metrics, chainReader, chainState, indexedChainState, subgraphClient),
metricsHandler: newMetricsHandler(promClient),
}
}

@@ -115,15 +122,15 @@ func (s *ServerV2) Start() error {
}
operators := v2.Group("/operators")
{
operators.GET("/non-signers", s.FetchNonSingers)
operators.GET("/nonsigners", s.FetchNonSingers)
operators.GET("/stake", s.FetchOperatorsStake)
operators.GET("/nodeinfo", s.FetchOperatorsNodeInfo)
operators.GET("/reachability", s.CheckOperatorsReachability)
}
metrics := v2.Group("/metrics")
{
metrics.GET("/overview", s.FetchMetricsOverviewHandler)
metrics.GET("/throughput", s.FetchMetricsThroughputHandler)
metrics.GET("/summary", s.FetchMetricsSummaryHandler)
metrics.GET("/timeseries/throughput", s.FetchMetricsThroughputTimeseriesHandler)
}
swagger := v2.Group("/swagger")
{
@@ -347,10 +354,88 @@ func (s *ServerV2) FetchNonSingers(c *gin.Context) {
errorResponse(c, errors.New("FetchNonSingers unimplemented"))
}

func (s *ServerV2) FetchMetricsOverviewHandler(c *gin.Context) {
errorResponse(c, errors.New("FetchMetricsOverviewHandler unimplemented"))
// FetchMetricsSummaryHandler godoc
//
// @Summary Fetch metrics summary
// @Tags Metrics
// @Produce json
// @Param start query int false "Start unix timestamp [default: 1 hour ago]"
// @Param end query int false "End unix timestamp [default: unix time now]"
// @Success 200 {object} Metric
// @Failure 400 {object} ErrorResponse "error: Bad request"
// @Failure 404 {object} ErrorResponse "error: Not found"
// @Failure 500 {object} ErrorResponse "error: Server error"
// @Router /metrics/summary [get]
func (s *ServerV2) FetchMetricsSummaryHandler(c *gin.Context) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("FetchMetricsSummary", f*1000) // make milliseconds
}))
defer timer.ObserveDuration()

now := time.Now()
start, err := strconv.ParseInt(c.DefaultQuery("start", "0"), 10, 64)
if err != nil || start == 0 {
start = now.Add(-time.Hour * 1).Unix()
}

end, err := strconv.ParseInt(c.DefaultQuery("end", "0"), 10, 64)
if err != nil || end == 0 {
end = now.Unix()
}

avgThroughput, err := s.metricsHandler.getAvgThroughput(c.Request.Context(), start, end)
if err != nil {
s.metrics.IncrementFailedRequestNum("FetchMetricsSummary")
errorResponse(c, err)
return
}

metricSummary := &MetricSummary{
AvgThroughput: avgThroughput,
}

s.metrics.IncrementSuccessfulRequestNum("FetchMetricsSummary")
c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxMetricAage))
c.JSON(http.StatusOK, metricSummary)
}

func (s *ServerV2) FetchMetricsThroughputHandler(c *gin.Context) {
errorResponse(c, errors.New("FetchMetricsThroughputHandler unimplemented"))
// FetchMetricsThroughputTimeseriesHandler godoc
//
// @Summary Fetch throughput time series
// @Tags Metrics
// @Produce json
// @Param start query int false "Start unix timestamp [default: 1 hour ago]"
// @Param end query int false "End unix timestamp [default: unix time now]"
// @Success 200 {object} []Throughput
// @Failure 400 {object} ErrorResponse "error: Bad request"
// @Failure 404 {object} ErrorResponse "error: Not found"
// @Failure 500 {object} ErrorResponse "error: Server error"
// @Router /metrics/timeseries/throughput [get]
func (s *ServerV2) FetchMetricsThroughputTimeseriesHandler(c *gin.Context) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("FetchMetricsThroughputTimeseriesHandler", f*1000) // make milliseconds
}))
defer timer.ObserveDuration()

now := time.Now()
start, err := strconv.ParseInt(c.DefaultQuery("start", "0"), 10, 64)
if err != nil || start == 0 {
start = now.Add(-time.Hour * 1).Unix()
}

end, err := strconv.ParseInt(c.DefaultQuery("end", "0"), 10, 64)
if err != nil || end == 0 {
end = now.Unix()
}

ths, err := s.metricsHandler.getThroughputTimeseries(c.Request.Context(), start, end)
if err != nil {
s.metrics.IncrementFailedRequestNum("FetchMetricsThroughputTimeseriesHandler")
errorResponse(c, err)
return
}

s.metrics.IncrementSuccessfulRequestNum("FetchMetricsThroughputTimeseriesHandler")
c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxThroughputAge))
c.JSON(http.StatusOK, ths)
}
74 changes: 74 additions & 0 deletions disperser/dataapi/server_v2_test.go
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ import (
"github.com/consensys/gnark-crypto/ecc/bn254/fp"
"github.com/google/uuid"
"github.com/ory/dockertest/v3"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -336,3 +337,76 @@ func TestFetchOperatorsStake(t *testing.T) {
assert.Equal(t, opId1.Hex(), ops[0].OperatorId)
assert.Equal(t, opId0.Hex(), ops[1].OperatorId)
}

func TestFetchMetricsSummaryHandler(t *testing.T) {
r := setUpRouter()

s := new(model.SampleStream)
err := s.UnmarshalJSON([]byte(mockPrometheusResponse))
assert.NoError(t, err)

matrix := make(model.Matrix, 0)
matrix = append(matrix, s)
mockPrometheusApi.On("QueryRange").Return(matrix, nil, nil).Once()

r.GET("/v2/metrics/summary", testDataApiServerV2.FetchMetricsSummaryHandler)

req := httptest.NewRequest(http.MethodGet, "/v2/metrics/summary", nil)
req.Close = true
w := httptest.NewRecorder()
r.ServeHTTP(w, req)

res := w.Result()
defer res.Body.Close()

data, err := io.ReadAll(res.Body)
assert.NoError(t, err)

var response dataapi.MetricSummary
err = json.Unmarshal(data, &response)
assert.NoError(t, err)
assert.NotNil(t, response)

assert.Equal(t, http.StatusOK, res.StatusCode)
assert.Equal(t, 16555.555555555555, response.AvgThroughput)
}

func TestFetchMetricsThroughputTimeseriesHandler(t *testing.T) {
r := setUpRouter()

s := new(model.SampleStream)
err := s.UnmarshalJSON([]byte(mockPrometheusRespAvgThroughput))
assert.NoError(t, err)

matrix := make(model.Matrix, 0)
matrix = append(matrix, s)
mockPrometheusApi.On("QueryRange").Return(matrix, nil, nil).Once()

r.GET("/v2/metrics/timeseries/throughput", testDataApiServer.FetchMetricsThroughputHandler)

w := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/v2/metrics/timeseries/throughput", nil)
r.ServeHTTP(w, req)

res := w.Result()
defer res.Body.Close()

data, err := io.ReadAll(res.Body)
assert.NoError(t, err)

var response []*dataapi.Throughput
err = json.Unmarshal(data, &response)
assert.NoError(t, err)
assert.NotNil(t, response)

var totalThroughput float64
for _, v := range response {
totalThroughput += v.Throughput
}

assert.Equal(t, http.StatusOK, res.StatusCode)
assert.Equal(t, 3361, len(response))
assert.Equal(t, float64(12000), response[0].Throughput)
assert.Equal(t, uint64(1701292920), response[0].Timestamp)
assert.Equal(t, float64(3.503022666666651e+07), totalThroughput)
}