From 122a4385ab056154d696af6381f17bc4042d8f8f Mon Sep 17 00:00:00 2001 From: Jian Xiao <99709935+jianoaix@users.noreply.github.com> Date: Fri, 20 Dec 2024 11:37:19 -0800 Subject: [PATCH] Add support for metrics APIs (#1044) --- disperser/dataapi/docs/docs.go | 105 ++++++++++++++++++++++++++++ disperser/dataapi/docs/swagger.json | 105 ++++++++++++++++++++++++++++ disperser/dataapi/docs/swagger.yaml | 68 ++++++++++++++++++ disperser/dataapi/server_v2.go | 99 ++++++++++++++++++++++++-- disperser/dataapi/server_v2_test.go | 74 ++++++++++++++++++++ 5 files changed, 444 insertions(+), 7 deletions(-) diff --git a/disperser/dataapi/docs/docs.go b/disperser/dataapi/docs/docs.go index 79e490408e..bbc26fc7e8 100644 --- a/disperser/dataapi/docs/docs.go +++ b/disperser/dataapi/docs/docs.go @@ -529,6 +529,57 @@ const docTemplate = `{ } } }, + "/metrics/summary": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Metrics" + ], + "summary": "Fetch metrics summary", + "parameters": [ + { + "type": "integer", + "description": "Start unix timestamp [default: 1 hour ago]", + "name": "start", + "in": "query" + }, + { + "type": "integer", + "description": "End unix timestamp [default: unix time now]", + "name": "end", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.Metric" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, "/metrics/throughput": { "get": { "produces": [ @@ -583,6 +634,60 @@ const docTemplate = `{ } } }, + "/metrics/timeseries/throughput": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Metrics" + ], + "summary": "Fetch throughput time series", + "parameters": [ + { + "type": "integer", + "description": "Start unix timestamp [default: 1 hour ago]", + "name": "start", + "in": "query" + }, + { + "type": "integer", + "description": "End unix timestamp [default: unix time now]", + "name": "end", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/dataapi.Throughput" + } + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, "/operators-info/deregistered-operators": { "get": { "produces": [ diff --git a/disperser/dataapi/docs/swagger.json b/disperser/dataapi/docs/swagger.json index 82011b9bbb..9f8e09c676 100644 --- a/disperser/dataapi/docs/swagger.json +++ b/disperser/dataapi/docs/swagger.json @@ -525,6 +525,57 @@ } } }, + "/metrics/summary": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Metrics" + ], + "summary": "Fetch metrics summary", + "parameters": [ + { + "type": "integer", + "description": "Start unix timestamp [default: 1 hour ago]", + "name": "start", + "in": "query" + }, + { + "type": "integer", + "description": "End unix timestamp [default: unix time now]", + "name": "end", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.Metric" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, "/metrics/throughput": { "get": { "produces": [ @@ -579,6 +630,60 @@ } } }, + "/metrics/timeseries/throughput": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Metrics" + ], + "summary": "Fetch throughput time series", + "parameters": [ + { + "type": "integer", + "description": "Start unix timestamp [default: 1 hour ago]", + "name": "start", + "in": "query" + }, + { + "type": "integer", + "description": "End unix timestamp [default: unix time now]", + "name": "end", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/dataapi.Throughput" + } + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, "/operators-info/deregistered-operators": { "get": { "produces": [ diff --git a/disperser/dataapi/docs/swagger.yaml b/disperser/dataapi/docs/swagger.yaml index 84b9f71826..c479f789d8 100644 --- a/disperser/dataapi/docs/swagger.yaml +++ b/disperser/dataapi/docs/swagger.yaml @@ -697,6 +697,39 @@ paths: summary: Fetch operators non signing percentage tags: - Metrics + /metrics/summary: + get: + parameters: + - description: 'Start unix timestamp [default: 1 hour ago]' + in: query + name: start + type: integer + - description: 'End unix timestamp [default: unix time now]' + in: query + name: end + type: integer + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/dataapi.Metric' + "400": + description: 'error: Bad request' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "404": + description: 'error: Not found' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "500": + description: 'error: Server error' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + summary: Fetch metrics summary + tags: + - Metrics /metrics/throughput: get: parameters: @@ -732,6 +765,41 @@ paths: summary: Fetch throughput time series tags: - Metrics + /metrics/timeseries/throughput: + get: + parameters: + - description: 'Start unix timestamp [default: 1 hour ago]' + in: query + name: start + type: integer + - description: 'End unix timestamp [default: unix time now]' + in: query + name: end + type: integer + produces: + - application/json + responses: + "200": + description: OK + schema: + items: + $ref: '#/definitions/dataapi.Throughput' + type: array + "400": + description: 'error: Bad request' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "404": + description: 'error: Not found' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "500": + description: 'error: Server error' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + summary: Fetch throughput time series + tags: + - Metrics /operators-info/deregistered-operators: get: produces: diff --git a/disperser/dataapi/server_v2.go b/disperser/dataapi/server_v2.go index 74d12e4a80..741a98ff42 100644 --- a/disperser/dataapi/server_v2.go +++ b/disperser/dataapi/server_v2.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "os" + "strconv" "strings" "time" @@ -39,6 +40,10 @@ type ( SignedBatch *SignedBatch `json:"signed_batch"` BlobVerificationInfos []*corev2.BlobVerificationInfo `json:"blob_verification_infos"` } + + MetricSummary struct { + AvgThroughput float64 `json:"avg_throughput"` + } ) type ServerInterface interface { @@ -61,6 +66,7 @@ type ServerV2 struct { metrics *Metrics operatorHandler *operatorHandler + metricsHandler *metricsHandler } func NewServerV2( @@ -88,6 +94,7 @@ func NewServerV2( indexedChainState: indexedChainState, metrics: metrics, operatorHandler: newOperatorHandler(l, metrics, chainReader, chainState, indexedChainState, subgraphClient), + metricsHandler: newMetricsHandler(promClient), } } @@ -115,15 +122,15 @@ func (s *ServerV2) Start() error { } operators := v2.Group("/operators") { - operators.GET("/non-signers", s.FetchNonSingers) + operators.GET("/nonsigners", s.FetchNonSingers) operators.GET("/stake", s.FetchOperatorsStake) operators.GET("/nodeinfo", s.FetchOperatorsNodeInfo) operators.GET("/reachability", s.CheckOperatorsReachability) } metrics := v2.Group("/metrics") { - metrics.GET("/overview", s.FetchMetricsOverviewHandler) - metrics.GET("/throughput", s.FetchMetricsThroughputHandler) + metrics.GET("/summary", s.FetchMetricsSummaryHandler) + metrics.GET("/timeseries/throughput", s.FetchMetricsThroughputTimeseriesHandler) } swagger := v2.Group("/swagger") { @@ -347,10 +354,88 @@ func (s *ServerV2) FetchNonSingers(c *gin.Context) { errorResponse(c, errors.New("FetchNonSingers unimplemented")) } -func (s *ServerV2) FetchMetricsOverviewHandler(c *gin.Context) { - errorResponse(c, errors.New("FetchMetricsOverviewHandler unimplemented")) +// FetchMetricsSummaryHandler godoc +// +// @Summary Fetch metrics summary +// @Tags Metrics +// @Produce json +// @Param start query int false "Start unix timestamp [default: 1 hour ago]" +// @Param end query int false "End unix timestamp [default: unix time now]" +// @Success 200 {object} Metric +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /metrics/summary [get] +func (s *ServerV2) FetchMetricsSummaryHandler(c *gin.Context) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("FetchMetricsSummary", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + now := time.Now() + start, err := strconv.ParseInt(c.DefaultQuery("start", "0"), 10, 64) + if err != nil || start == 0 { + start = now.Add(-time.Hour * 1).Unix() + } + + end, err := strconv.ParseInt(c.DefaultQuery("end", "0"), 10, 64) + if err != nil || end == 0 { + end = now.Unix() + } + + avgThroughput, err := s.metricsHandler.getAvgThroughput(c.Request.Context(), start, end) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchMetricsSummary") + errorResponse(c, err) + return + } + + metricSummary := &MetricSummary{ + AvgThroughput: avgThroughput, + } + + s.metrics.IncrementSuccessfulRequestNum("FetchMetricsSummary") + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxMetricAage)) + c.JSON(http.StatusOK, metricSummary) } -func (s *ServerV2) FetchMetricsThroughputHandler(c *gin.Context) { - errorResponse(c, errors.New("FetchMetricsThroughputHandler unimplemented")) +// FetchMetricsThroughputTimeseriesHandler godoc +// +// @Summary Fetch throughput time series +// @Tags Metrics +// @Produce json +// @Param start query int false "Start unix timestamp [default: 1 hour ago]" +// @Param end query int false "End unix timestamp [default: unix time now]" +// @Success 200 {object} []Throughput +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /metrics/timeseries/throughput [get] +func (s *ServerV2) FetchMetricsThroughputTimeseriesHandler(c *gin.Context) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("FetchMetricsThroughputTimeseriesHandler", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + now := time.Now() + start, err := strconv.ParseInt(c.DefaultQuery("start", "0"), 10, 64) + if err != nil || start == 0 { + start = now.Add(-time.Hour * 1).Unix() + } + + end, err := strconv.ParseInt(c.DefaultQuery("end", "0"), 10, 64) + if err != nil || end == 0 { + end = now.Unix() + } + + ths, err := s.metricsHandler.getThroughputTimeseries(c.Request.Context(), start, end) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchMetricsThroughputTimeseriesHandler") + errorResponse(c, err) + return + } + + s.metrics.IncrementSuccessfulRequestNum("FetchMetricsThroughputTimeseriesHandler") + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxThroughputAge)) + c.JSON(http.StatusOK, ths) } diff --git a/disperser/dataapi/server_v2_test.go b/disperser/dataapi/server_v2_test.go index 17458e5718..e3ce17a4fe 100644 --- a/disperser/dataapi/server_v2_test.go +++ b/disperser/dataapi/server_v2_test.go @@ -29,6 +29,7 @@ import ( "github.com/consensys/gnark-crypto/ecc/bn254/fp" "github.com/google/uuid" "github.com/ory/dockertest/v3" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -336,3 +337,76 @@ func TestFetchOperatorsStake(t *testing.T) { assert.Equal(t, opId1.Hex(), ops[0].OperatorId) assert.Equal(t, opId0.Hex(), ops[1].OperatorId) } + +func TestFetchMetricsSummaryHandler(t *testing.T) { + r := setUpRouter() + + s := new(model.SampleStream) + err := s.UnmarshalJSON([]byte(mockPrometheusResponse)) + assert.NoError(t, err) + + matrix := make(model.Matrix, 0) + matrix = append(matrix, s) + mockPrometheusApi.On("QueryRange").Return(matrix, nil, nil).Once() + + r.GET("/v2/metrics/summary", testDataApiServerV2.FetchMetricsSummaryHandler) + + req := httptest.NewRequest(http.MethodGet, "/v2/metrics/summary", nil) + req.Close = true + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.MetricSummary + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + assert.Equal(t, http.StatusOK, res.StatusCode) + assert.Equal(t, 16555.555555555555, response.AvgThroughput) +} + +func TestFetchMetricsThroughputTimeseriesHandler(t *testing.T) { + r := setUpRouter() + + s := new(model.SampleStream) + err := s.UnmarshalJSON([]byte(mockPrometheusRespAvgThroughput)) + assert.NoError(t, err) + + matrix := make(model.Matrix, 0) + matrix = append(matrix, s) + mockPrometheusApi.On("QueryRange").Return(matrix, nil, nil).Once() + + r.GET("/v2/metrics/timeseries/throughput", testDataApiServer.FetchMetricsThroughputHandler) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v2/metrics/timeseries/throughput", nil) + r.ServeHTTP(w, req) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response []*dataapi.Throughput + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + var totalThroughput float64 + for _, v := range response { + totalThroughput += v.Throughput + } + + assert.Equal(t, http.StatusOK, res.StatusCode) + assert.Equal(t, 3361, len(response)) + assert.Equal(t, float64(12000), response[0].Throughput) + assert.Equal(t, uint64(1701292920), response[0].Timestamp) + assert.Equal(t, float64(3.503022666666651e+07), totalThroughput) +}