From 6673931d3c400f141e14405467255693c7709b5a Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Thu, 19 Dec 2024 16:49:12 +0000 Subject: [PATCH 1/2] Refactor the metrics computing at dataapi --- disperser/dataapi/metrics_handler.go | 58 +++++++++++++++++++++++++++ disperser/dataapi/metrics_handlers.go | 41 +------------------ disperser/dataapi/server.go | 4 +- 3 files changed, 62 insertions(+), 41 deletions(-) create mode 100644 disperser/dataapi/metrics_handler.go diff --git a/disperser/dataapi/metrics_handler.go b/disperser/dataapi/metrics_handler.go new file mode 100644 index 0000000000..b65c1263af --- /dev/null +++ b/disperser/dataapi/metrics_handler.go @@ -0,0 +1,58 @@ +package dataapi + +import ( + "context" + "time" +) + +// metricHandler handles operations to collect metrics about the Disperser. +type metricsHandler struct { + // For accessing metrics info + promClient PrometheusClient +} + +func newMetricsHandler(promClient PrometheusClient) *metricsHandler { + return &metricsHandler{ + promClient: promClient, + } +} + +func (mh *metricsHandler) getAvgThroughput(ctx context.Context, startTime int64, endTime int64) (float64, error) { + result, err := mh.promClient.QueryDisperserBlobSizeBytesPerSecond(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0)) + if err != nil { + return 0, err + } + size := len(result.Values) + if size == 0 { + return 0, nil + } + totalBytes := result.Values[size-1].Value - result.Values[0].Value + timeDuration := result.Values[size-1].Timestamp.Sub(result.Values[0].Timestamp).Seconds() + return totalBytes / timeDuration, nil +} + +func (mh *metricsHandler) getThroughputTimeseries(ctx context.Context, startTime int64, endTime int64) ([]*Throughput, error) { + throughputRateSecs := uint16(defaultThroughputRateSecs) + if endTime-startTime >= 7*24*60*60 { + throughputRateSecs = uint16(sevenDayThroughputRateSecs) + } + result, err := mh.promClient.QueryDisperserAvgThroughputBlobSizeBytes(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0), throughputRateSecs) + if err != nil { + return nil, err + } + + if len(result.Values) <= 1 { + return []*Throughput{}, nil + } + + throughputs := make([]*Throughput, 0) + for i := throughputRateSecs; i < uint16(len(result.Values)); i++ { + v := result.Values[i] + throughputs = append(throughputs, &Throughput{ + Timestamp: uint64(v.Timestamp.Unix()), + Throughput: v.Value, + }) + } + + return throughputs, nil +} diff --git a/disperser/dataapi/metrics_handlers.go b/disperser/dataapi/metrics_handlers.go index 9b6c68dcb0..fcc8e55111 100644 --- a/disperser/dataapi/metrics_handlers.go +++ b/disperser/dataapi/metrics_handlers.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "math/big" - "time" "github.com/Layr-Labs/eigenda/core" ) @@ -49,23 +48,11 @@ func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64) } } - result, err := s.promClient.QueryDisperserBlobSizeBytesPerSecond(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0)) + throughput, err := s.metricsHandler.getAvgThroughput(ctx, startTime, endTime) if err != nil { return nil, err } - var ( - totalBytes float64 - timeDuration float64 - throughput float64 - valuesSize = len(result.Values) - ) - if valuesSize > 1 { - totalBytes = result.Values[valuesSize-1].Value - result.Values[0].Value - timeDuration = result.Values[valuesSize-1].Timestamp.Sub(result.Values[0].Timestamp).Seconds() - throughput = totalBytes / timeDuration - } - costInGas, err := s.calculateTotalCostGasUsed(ctx) if err != nil { return nil, err @@ -79,32 +66,6 @@ func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64) }, nil } -func (s *server) getThroughput(ctx context.Context, start int64, end int64) ([]*Throughput, error) { - throughputRateSecs := uint16(defaultThroughputRateSecs) - if end-start >= 7*24*60*60 { - throughputRateSecs = uint16(sevenDayThroughputRateSecs) - } - result, err := s.promClient.QueryDisperserAvgThroughputBlobSizeBytes(ctx, time.Unix(start, 0), time.Unix(end, 0), throughputRateSecs) - if err != nil { - return nil, err - } - - if len(result.Values) <= 1 { - return []*Throughput{}, nil - } - - throughputs := make([]*Throughput, 0) - for i := throughputRateSecs; i < uint16(len(result.Values)); i++ { - v := result.Values[i] - throughputs = append(throughputs, &Throughput{ - Timestamp: uint64(v.Timestamp.Unix()), - Throughput: v.Value, - }) - } - - return throughputs, nil -} - func (s *server) calculateTotalCostGasUsed(ctx context.Context) (float64, error) { batches, err := s.subgraphClient.QueryBatchesWithLimit(ctx, 1, 0) if err != nil { diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index eb6ee4f7e5..a04e3d3e8f 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -209,6 +209,7 @@ type ( eigenDAHttpServiceChecker EigenDAHttpServiceChecker operatorHandler *operatorHandler + metricsHandler *metricsHandler } ) @@ -260,6 +261,7 @@ func NewServer( eigenDAGRPCServiceChecker: eigenDAGRPCServiceChecker, eigenDAHttpServiceChecker: eigenDAHttpServiceChecker, operatorHandler: newOperatorHandler(logger, metrics, transactor, chainState, indexedChainState, subgraphClient), + metricsHandler: newMetricsHandler(promClient), } } @@ -607,7 +609,7 @@ func (s *server) FetchMetricsThroughputHandler(c *gin.Context) { end = now.Unix() } - ths, err := s.getThroughput(c.Request.Context(), start, end) + ths, err := s.metricsHandler.getThroughputTimeseries(c.Request.Context(), start, end) if err != nil { s.metrics.IncrementFailedRequestNum("FetchMetricsTroughput") errorResponse(c, err) From 692657a2ba78e26474c588acb184ff7600346bfa Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Thu, 19 Dec 2024 16:52:32 +0000 Subject: [PATCH 2/2] fix --- disperser/dataapi/metrics_handler.go | 5 +++++ disperser/dataapi/metrics_handlers.go | 6 ------ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/disperser/dataapi/metrics_handler.go b/disperser/dataapi/metrics_handler.go index b65c1263af..9ccbc6fbc5 100644 --- a/disperser/dataapi/metrics_handler.go +++ b/disperser/dataapi/metrics_handler.go @@ -5,6 +5,11 @@ import ( "time" ) +const ( + defaultThroughputRateSecs = 240 // 4m rate is used for < 7d window to match $__rate_interval + sevenDayThroughputRateSecs = 660 // 11m rate is used for >= 7d window to match $__rate_interval +) + // metricHandler handles operations to collect metrics about the Disperser. type metricsHandler struct { // For accessing metrics info diff --git a/disperser/dataapi/metrics_handlers.go b/disperser/dataapi/metrics_handlers.go index fcc8e55111..e317c5cfdb 100644 --- a/disperser/dataapi/metrics_handlers.go +++ b/disperser/dataapi/metrics_handlers.go @@ -9,12 +9,6 @@ import ( "github.com/Layr-Labs/eigenda/core" ) -const ( - maxWorkersGetOperatorState = 10 // The maximum number of workers to use when querying operator state. - defaultThroughputRateSecs = 240 // 4m rate is used for < 7d window to match $__rate_interval - sevenDayThroughputRateSecs = 660 // 11m rate is used for >= 7d window to match $__rate_interval -) - func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64) (*Metric, error) { blockNumber, err := s.transactor.GetCurrentBlockNumber(ctx) if err != nil {