diff --git a/disperser/dataapi/metrics_handler.go b/disperser/dataapi/metrics_handler.go new file mode 100644 index 000000000..9ccbc6fbc --- /dev/null +++ b/disperser/dataapi/metrics_handler.go @@ -0,0 +1,63 @@ +package dataapi + +import ( + "context" + "time" +) + +const ( + defaultThroughputRateSecs = 240 // 4m rate is used for < 7d window to match $__rate_interval + sevenDayThroughputRateSecs = 660 // 11m rate is used for >= 7d window to match $__rate_interval +) + +// metricHandler handles operations to collect metrics about the Disperser. +type metricsHandler struct { + // For accessing metrics info + promClient PrometheusClient +} + +func newMetricsHandler(promClient PrometheusClient) *metricsHandler { + return &metricsHandler{ + promClient: promClient, + } +} + +func (mh *metricsHandler) getAvgThroughput(ctx context.Context, startTime int64, endTime int64) (float64, error) { + result, err := mh.promClient.QueryDisperserBlobSizeBytesPerSecond(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0)) + if err != nil { + return 0, err + } + size := len(result.Values) + if size == 0 { + return 0, nil + } + totalBytes := result.Values[size-1].Value - result.Values[0].Value + timeDuration := result.Values[size-1].Timestamp.Sub(result.Values[0].Timestamp).Seconds() + return totalBytes / timeDuration, nil +} + +func (mh *metricsHandler) getThroughputTimeseries(ctx context.Context, startTime int64, endTime int64) ([]*Throughput, error) { + throughputRateSecs := uint16(defaultThroughputRateSecs) + if endTime-startTime >= 7*24*60*60 { + throughputRateSecs = uint16(sevenDayThroughputRateSecs) + } + result, err := mh.promClient.QueryDisperserAvgThroughputBlobSizeBytes(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0), throughputRateSecs) + if err != nil { + return nil, err + } + + if len(result.Values) <= 1 { + return []*Throughput{}, nil + } + + throughputs := make([]*Throughput, 0) + for i := throughputRateSecs; i < uint16(len(result.Values)); i++ { + v := result.Values[i] + throughputs = append(throughputs, &Throughput{ + Timestamp: uint64(v.Timestamp.Unix()), + Throughput: v.Value, + }) + } + + return throughputs, nil +} diff --git a/disperser/dataapi/metrics_handlers.go b/disperser/dataapi/metrics_handlers.go index 9b6c68dcb..e317c5cfd 100644 --- a/disperser/dataapi/metrics_handlers.go +++ b/disperser/dataapi/metrics_handlers.go @@ -5,17 +5,10 @@ import ( "errors" "fmt" "math/big" - "time" "github.com/Layr-Labs/eigenda/core" ) -const ( - maxWorkersGetOperatorState = 10 // The maximum number of workers to use when querying operator state. - defaultThroughputRateSecs = 240 // 4m rate is used for < 7d window to match $__rate_interval - sevenDayThroughputRateSecs = 660 // 11m rate is used for >= 7d window to match $__rate_interval -) - func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64) (*Metric, error) { blockNumber, err := s.transactor.GetCurrentBlockNumber(ctx) if err != nil { @@ -49,23 +42,11 @@ func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64) } } - result, err := s.promClient.QueryDisperserBlobSizeBytesPerSecond(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0)) + throughput, err := s.metricsHandler.getAvgThroughput(ctx, startTime, endTime) if err != nil { return nil, err } - var ( - totalBytes float64 - timeDuration float64 - throughput float64 - valuesSize = len(result.Values) - ) - if valuesSize > 1 { - totalBytes = result.Values[valuesSize-1].Value - result.Values[0].Value - timeDuration = result.Values[valuesSize-1].Timestamp.Sub(result.Values[0].Timestamp).Seconds() - throughput = totalBytes / timeDuration - } - costInGas, err := s.calculateTotalCostGasUsed(ctx) if err != nil { return nil, err @@ -79,32 +60,6 @@ func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64) }, nil } -func (s *server) getThroughput(ctx context.Context, start int64, end int64) ([]*Throughput, error) { - throughputRateSecs := uint16(defaultThroughputRateSecs) - if end-start >= 7*24*60*60 { - throughputRateSecs = uint16(sevenDayThroughputRateSecs) - } - result, err := s.promClient.QueryDisperserAvgThroughputBlobSizeBytes(ctx, time.Unix(start, 0), time.Unix(end, 0), throughputRateSecs) - if err != nil { - return nil, err - } - - if len(result.Values) <= 1 { - return []*Throughput{}, nil - } - - throughputs := make([]*Throughput, 0) - for i := throughputRateSecs; i < uint16(len(result.Values)); i++ { - v := result.Values[i] - throughputs = append(throughputs, &Throughput{ - Timestamp: uint64(v.Timestamp.Unix()), - Throughput: v.Value, - }) - } - - return throughputs, nil -} - func (s *server) calculateTotalCostGasUsed(ctx context.Context) (float64, error) { batches, err := s.subgraphClient.QueryBatchesWithLimit(ctx, 1, 0) if err != nil { diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index eb6ee4f7e..a04e3d3e8 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -209,6 +209,7 @@ type ( eigenDAHttpServiceChecker EigenDAHttpServiceChecker operatorHandler *operatorHandler + metricsHandler *metricsHandler } ) @@ -260,6 +261,7 @@ func NewServer( eigenDAGRPCServiceChecker: eigenDAGRPCServiceChecker, eigenDAHttpServiceChecker: eigenDAHttpServiceChecker, operatorHandler: newOperatorHandler(logger, metrics, transactor, chainState, indexedChainState, subgraphClient), + metricsHandler: newMetricsHandler(promClient), } } @@ -607,7 +609,7 @@ func (s *server) FetchMetricsThroughputHandler(c *gin.Context) { end = now.Unix() } - ths, err := s.getThroughput(c.Request.Context(), start, end) + ths, err := s.metricsHandler.getThroughputTimeseries(c.Request.Context(), start, end) if err != nil { s.metrics.IncrementFailedRequestNum("FetchMetricsTroughput") errorResponse(c, err)