Skip to content

Commit

Permalink
Refactor the metrics computing at dataapi (#1039)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Dec 19, 2024
1 parent ac36126 commit 45be21e
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 47 deletions.
63 changes: 63 additions & 0 deletions disperser/dataapi/metrics_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package dataapi

import (
"context"
"time"
)

const (
defaultThroughputRateSecs = 240 // 4m rate is used for < 7d window to match $__rate_interval
sevenDayThroughputRateSecs = 660 // 11m rate is used for >= 7d window to match $__rate_interval
)

// metricHandler handles operations to collect metrics about the Disperser.
type metricsHandler struct {
// For accessing metrics info
promClient PrometheusClient
}

func newMetricsHandler(promClient PrometheusClient) *metricsHandler {
return &metricsHandler{
promClient: promClient,
}
}

func (mh *metricsHandler) getAvgThroughput(ctx context.Context, startTime int64, endTime int64) (float64, error) {
result, err := mh.promClient.QueryDisperserBlobSizeBytesPerSecond(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0))
if err != nil {
return 0, err
}
size := len(result.Values)
if size == 0 {
return 0, nil
}
totalBytes := result.Values[size-1].Value - result.Values[0].Value
timeDuration := result.Values[size-1].Timestamp.Sub(result.Values[0].Timestamp).Seconds()
return totalBytes / timeDuration, nil
}

func (mh *metricsHandler) getThroughputTimeseries(ctx context.Context, startTime int64, endTime int64) ([]*Throughput, error) {
throughputRateSecs := uint16(defaultThroughputRateSecs)
if endTime-startTime >= 7*24*60*60 {
throughputRateSecs = uint16(sevenDayThroughputRateSecs)
}
result, err := mh.promClient.QueryDisperserAvgThroughputBlobSizeBytes(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0), throughputRateSecs)
if err != nil {
return nil, err
}

if len(result.Values) <= 1 {
return []*Throughput{}, nil
}

throughputs := make([]*Throughput, 0)
for i := throughputRateSecs; i < uint16(len(result.Values)); i++ {
v := result.Values[i]
throughputs = append(throughputs, &Throughput{
Timestamp: uint64(v.Timestamp.Unix()),
Throughput: v.Value,
})
}

return throughputs, nil
}
47 changes: 1 addition & 46 deletions disperser/dataapi/metrics_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,10 @@ import (
"errors"
"fmt"
"math/big"
"time"

"github.com/Layr-Labs/eigenda/core"
)

const (
maxWorkersGetOperatorState = 10 // The maximum number of workers to use when querying operator state.
defaultThroughputRateSecs = 240 // 4m rate is used for < 7d window to match $__rate_interval
sevenDayThroughputRateSecs = 660 // 11m rate is used for >= 7d window to match $__rate_interval
)

func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64) (*Metric, error) {
blockNumber, err := s.transactor.GetCurrentBlockNumber(ctx)
if err != nil {
Expand Down Expand Up @@ -49,23 +42,11 @@ func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64)
}
}

result, err := s.promClient.QueryDisperserBlobSizeBytesPerSecond(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0))
throughput, err := s.metricsHandler.getAvgThroughput(ctx, startTime, endTime)
if err != nil {
return nil, err
}

var (
totalBytes float64
timeDuration float64
throughput float64
valuesSize = len(result.Values)
)
if valuesSize > 1 {
totalBytes = result.Values[valuesSize-1].Value - result.Values[0].Value
timeDuration = result.Values[valuesSize-1].Timestamp.Sub(result.Values[0].Timestamp).Seconds()
throughput = totalBytes / timeDuration
}

costInGas, err := s.calculateTotalCostGasUsed(ctx)
if err != nil {
return nil, err
Expand All @@ -79,32 +60,6 @@ func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64)
}, nil
}

func (s *server) getThroughput(ctx context.Context, start int64, end int64) ([]*Throughput, error) {
throughputRateSecs := uint16(defaultThroughputRateSecs)
if end-start >= 7*24*60*60 {
throughputRateSecs = uint16(sevenDayThroughputRateSecs)
}
result, err := s.promClient.QueryDisperserAvgThroughputBlobSizeBytes(ctx, time.Unix(start, 0), time.Unix(end, 0), throughputRateSecs)
if err != nil {
return nil, err
}

if len(result.Values) <= 1 {
return []*Throughput{}, nil
}

throughputs := make([]*Throughput, 0)
for i := throughputRateSecs; i < uint16(len(result.Values)); i++ {
v := result.Values[i]
throughputs = append(throughputs, &Throughput{
Timestamp: uint64(v.Timestamp.Unix()),
Throughput: v.Value,
})
}

return throughputs, nil
}

func (s *server) calculateTotalCostGasUsed(ctx context.Context) (float64, error) {
batches, err := s.subgraphClient.QueryBatchesWithLimit(ctx, 1, 0)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ type (
eigenDAHttpServiceChecker EigenDAHttpServiceChecker

operatorHandler *operatorHandler
metricsHandler *metricsHandler
}
)

Expand Down Expand Up @@ -260,6 +261,7 @@ func NewServer(
eigenDAGRPCServiceChecker: eigenDAGRPCServiceChecker,
eigenDAHttpServiceChecker: eigenDAHttpServiceChecker,
operatorHandler: newOperatorHandler(logger, metrics, transactor, chainState, indexedChainState, subgraphClient),
metricsHandler: newMetricsHandler(promClient),
}
}

Expand Down Expand Up @@ -607,7 +609,7 @@ func (s *server) FetchMetricsThroughputHandler(c *gin.Context) {
end = now.Unix()
}

ths, err := s.getThroughput(c.Request.Context(), start, end)
ths, err := s.metricsHandler.getThroughputTimeseries(c.Request.Context(), start, end)
if err != nil {
s.metrics.IncrementFailedRequestNum("FetchMetricsTroughput")
errorResponse(c, err)
Expand Down

0 comments on commit 45be21e

Please sign in to comment.