Skip to content

Commit

Permalink
Use variable rate interval for blob thoughput metrics based on time i…
Browse files Browse the repository at this point in the history
…nterval

DA internal dashboard blob throughput does not match blob explorer throughput graphs because dataapi query uses 2m rate interval and DA dashboard uses auto `$__rate_interval`

See https://grafana.com/blog/2020/09/28/new-in-grafana-7.2-__rate_interval-for-prometheus-rate-queries-that-just-work/

The problem is that `$__rate_interval` is a grafana only feature and can not be used for raw prometheus queries. This change converts dataAPI metrics to use the auto variable rate based
on query interval length and changes the default rate from 2m to 4m to match what `$__rate_interval` does under the covers.

Use `4m` for < 7d time interval
Use `11m` for >= 7d time interval
  • Loading branch information
pschork committed Jun 26, 2024
1 parent b911361 commit d1484e7
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 16 deletions.
11 changes: 8 additions & 3 deletions disperser/dataapi/metrics_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
)

const (
avgThroughputWindowSize = 120 // The time window (in seconds) to calculate the data throughput.
maxWorkersGetOperatorState = 10 // The maximum number of workers to use when querying operator state.
defaultThroughputRateSecs = 240 // 4m rate is used for < 7d window to match $__rate_interval
sevenDayThroughputRateSecs = 660 // 11m rate is used for >= 7d window to match $__rate_interval
)

func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64) (*Metric, error) {
Expand Down Expand Up @@ -79,7 +80,11 @@ func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64)
}

func (s *server) getThroughput(ctx context.Context, start int64, end int64) ([]*Throughput, error) {
result, err := s.promClient.QueryDisperserAvgThroughputBlobSizeBytes(ctx, time.Unix(start, 0), time.Unix(end, 0), avgThroughputWindowSize)
throughputRateSecs := uint16(defaultThroughputRateSecs)
if end-start >= 7*24*60*60 {
throughputRateSecs = uint16(sevenDayThroughputRateSecs)
}
result, err := s.promClient.QueryDisperserAvgThroughputBlobSizeBytes(ctx, time.Unix(start, 0), time.Unix(end, 0), throughputRateSecs)
if err != nil {
return nil, err
}
Expand All @@ -89,7 +94,7 @@ func (s *server) getThroughput(ctx context.Context, start int64, end int64) ([]*
}

throughputs := make([]*Throughput, 0)
for i := avgThroughputWindowSize; i < len(result.Values); i++ {
for i := throughputRateSecs; i < uint16(len(result.Values)); i++ {
v := result.Values[i]
throughputs = append(throughputs, &Throughput{
Timestamp: uint64(v.Timestamp.Unix()),
Expand Down
13 changes: 4 additions & 9 deletions disperser/dataapi/prometheus_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ import (

const (
// maxNumOfDataPoints is the maximum number of data points that can be queried from Prometheus based on latency that this API can provide
maxNumOfDataPoints = 3500
throughputRateWindowInSec = 60
maxNumOfDataPoints = 3500
)

type (
PrometheusClient interface {
QueryDisperserBlobSizeBytesPerSecond(ctx context.Context, start time.Time, end time.Time) (*PrometheusResult, error)
QueryDisperserAvgThroughputBlobSizeBytes(ctx context.Context, start time.Time, end time.Time, windowSizeInSec uint8) (*PrometheusResult, error)
QueryDisperserAvgThroughputBlobSizeBytes(ctx context.Context, start time.Time, end time.Time, windowSizeInSec uint16) (*PrometheusResult, error)
}

PrometheusResultValues struct {
Expand Down Expand Up @@ -47,12 +46,8 @@ func (pc *prometheusClient) QueryDisperserBlobSizeBytesPerSecond(ctx context.Con
return pc.queryRange(ctx, query, start, end)
}

func (pc *prometheusClient) QueryDisperserAvgThroughputBlobSizeBytes(ctx context.Context, start time.Time, end time.Time, windowSizeInSec uint8) (*PrometheusResult, error) {
if windowSizeInSec < throughputRateWindowInSec {
windowSizeInSec = throughputRateWindowInSec
}

query := fmt.Sprintf("sum by (job) (rate(eigenda_batcher_blobs_total{state=\"confirmed\",data=\"size\",cluster=\"%s\"}[%ds]))", pc.cluster, windowSizeInSec)
func (pc *prometheusClient) QueryDisperserAvgThroughputBlobSizeBytes(ctx context.Context, start time.Time, end time.Time, throughputRateSecs uint16) (*PrometheusResult, error) {
query := fmt.Sprintf("sum by (job) (rate(eigenda_batcher_blobs_total{state=\"confirmed\",data=\"size\",cluster=\"%s\"}[%ds]))", pc.cluster, throughputRateSecs)
return pc.queryRange(ctx, query, start, end)
}

Expand Down
8 changes: 4 additions & 4 deletions disperser/dataapi/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,10 @@ func TestFetchMetricsThroughputHandler(t *testing.T) {
}

assert.Equal(t, http.StatusOK, res.StatusCode)
assert.Equal(t, 3481, len(response))
assert.Equal(t, float64(11666.666666666666), response[0].Throughput)
assert.Equal(t, uint64(1701292800), response[0].Timestamp)
assert.Equal(t, float64(3.599722666666646e+07), totalThroughput)
assert.Equal(t, 3361, len(response))
assert.Equal(t, float64(12000), response[0].Throughput)
assert.Equal(t, uint64(1701292920), response[0].Timestamp)
assert.Equal(t, float64(3.503022666666651e+07), totalThroughput)
}

func TestEjectOperatorHandler(t *testing.T) {
Expand Down

0 comments on commit d1484e7

Please sign in to comment.