Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the metrics computing at dataapi #1039

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions disperser/dataapi/metrics_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package dataapi

import (
"context"
"time"
)

const (
defaultThroughputRateSecs = 240 // 4m rate is used for < 7d window to match $__rate_interval
sevenDayThroughputRateSecs = 660 // 11m rate is used for >= 7d window to match $__rate_interval
)

// metricHandler handles operations to collect metrics about the Disperser.
type metricsHandler struct {
// For accessing metrics info
promClient PrometheusClient
}

func newMetricsHandler(promClient PrometheusClient) *metricsHandler {
return &metricsHandler{
promClient: promClient,
}
}

func (mh *metricsHandler) getAvgThroughput(ctx context.Context, startTime int64, endTime int64) (float64, error) {
result, err := mh.promClient.QueryDisperserBlobSizeBytesPerSecond(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0))
if err != nil {
return 0, err
}
size := len(result.Values)
if size == 0 {
return 0, nil
}
totalBytes := result.Values[size-1].Value - result.Values[0].Value
timeDuration := result.Values[size-1].Timestamp.Sub(result.Values[0].Timestamp).Seconds()
return totalBytes / timeDuration, nil
}

func (mh *metricsHandler) getThroughputTimeseries(ctx context.Context, startTime int64, endTime int64) ([]*Throughput, error) {
throughputRateSecs := uint16(defaultThroughputRateSecs)
if endTime-startTime >= 7*24*60*60 {
throughputRateSecs = uint16(sevenDayThroughputRateSecs)
}
result, err := mh.promClient.QueryDisperserAvgThroughputBlobSizeBytes(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0), throughputRateSecs)
if err != nil {
return nil, err
}

if len(result.Values) <= 1 {
return []*Throughput{}, nil
}

throughputs := make([]*Throughput, 0)
for i := throughputRateSecs; i < uint16(len(result.Values)); i++ {
v := result.Values[i]
throughputs = append(throughputs, &Throughput{
Timestamp: uint64(v.Timestamp.Unix()),
Throughput: v.Value,
})
}

return throughputs, nil
}
47 changes: 1 addition & 46 deletions disperser/dataapi/metrics_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,10 @@ import (
"errors"
"fmt"
"math/big"
"time"

"github.com/Layr-Labs/eigenda/core"
)

const (
maxWorkersGetOperatorState = 10 // The maximum number of workers to use when querying operator state.
defaultThroughputRateSecs = 240 // 4m rate is used for < 7d window to match $__rate_interval
sevenDayThroughputRateSecs = 660 // 11m rate is used for >= 7d window to match $__rate_interval
)

func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64) (*Metric, error) {
blockNumber, err := s.transactor.GetCurrentBlockNumber(ctx)
if err != nil {
Expand Down Expand Up @@ -49,23 +42,11 @@ func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64)
}
}

result, err := s.promClient.QueryDisperserBlobSizeBytesPerSecond(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0))
throughput, err := s.metricsHandler.getAvgThroughput(ctx, startTime, endTime)
if err != nil {
return nil, err
}

var (
totalBytes float64
timeDuration float64
throughput float64
valuesSize = len(result.Values)
)
if valuesSize > 1 {
totalBytes = result.Values[valuesSize-1].Value - result.Values[0].Value
timeDuration = result.Values[valuesSize-1].Timestamp.Sub(result.Values[0].Timestamp).Seconds()
throughput = totalBytes / timeDuration
}

costInGas, err := s.calculateTotalCostGasUsed(ctx)
if err != nil {
return nil, err
Expand All @@ -79,32 +60,6 @@ func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64)
}, nil
}

func (s *server) getThroughput(ctx context.Context, start int64, end int64) ([]*Throughput, error) {
throughputRateSecs := uint16(defaultThroughputRateSecs)
if end-start >= 7*24*60*60 {
throughputRateSecs = uint16(sevenDayThroughputRateSecs)
}
result, err := s.promClient.QueryDisperserAvgThroughputBlobSizeBytes(ctx, time.Unix(start, 0), time.Unix(end, 0), throughputRateSecs)
if err != nil {
return nil, err
}

if len(result.Values) <= 1 {
return []*Throughput{}, nil
}

throughputs := make([]*Throughput, 0)
for i := throughputRateSecs; i < uint16(len(result.Values)); i++ {
v := result.Values[i]
throughputs = append(throughputs, &Throughput{
Timestamp: uint64(v.Timestamp.Unix()),
Throughput: v.Value,
})
}

return throughputs, nil
}

func (s *server) calculateTotalCostGasUsed(ctx context.Context) (float64, error) {
batches, err := s.subgraphClient.QueryBatchesWithLimit(ctx, 1, 0)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ type (
eigenDAHttpServiceChecker EigenDAHttpServiceChecker

operatorHandler *operatorHandler
metricsHandler *metricsHandler
}
)

Expand Down Expand Up @@ -260,6 +261,7 @@ func NewServer(
eigenDAGRPCServiceChecker: eigenDAGRPCServiceChecker,
eigenDAHttpServiceChecker: eigenDAHttpServiceChecker,
operatorHandler: newOperatorHandler(logger, metrics, transactor, chainState, indexedChainState, subgraphClient),
metricsHandler: newMetricsHandler(promClient),
}
}

Expand Down Expand Up @@ -607,7 +609,7 @@ func (s *server) FetchMetricsThroughputHandler(c *gin.Context) {
end = now.Unix()
}

ths, err := s.getThroughput(c.Request.Context(), start, end)
ths, err := s.metricsHandler.getThroughputTimeseries(c.Request.Context(), start, end)
if err != nil {
s.metrics.IncrementFailedRequestNum("FetchMetricsTroughput")
errorResponse(c, err)
Expand Down
Loading