Skip to content

Commit

Permalink
Use variable rate interval for blob thoughput metrics based on time i…
Browse files Browse the repository at this point in the history
…nterval (#621)
  • Loading branch information
pschork authored Jun 26, 2024
1 parent b911361 commit 65b5725
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 16 deletions.
11 changes: 8 additions & 3 deletions disperser/dataapi/metrics_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
)

const (
avgThroughputWindowSize = 120 // The time window (in seconds) to calculate the data throughput.
maxWorkersGetOperatorState = 10 // The maximum number of workers to use when querying operator state.
defaultThroughputRateSecs = 240 // 4m rate is used for < 7d window to match $__rate_interval
sevenDayThroughputRateSecs = 660 // 11m rate is used for >= 7d window to match $__rate_interval
)

func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64) (*Metric, error) {
Expand Down Expand Up @@ -79,7 +80,11 @@ func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64)
}

func (s *server) getThroughput(ctx context.Context, start int64, end int64) ([]*Throughput, error) {
result, err := s.promClient.QueryDisperserAvgThroughputBlobSizeBytes(ctx, time.Unix(start, 0), time.Unix(end, 0), avgThroughputWindowSize)
throughputRateSecs := uint16(defaultThroughputRateSecs)
if end-start >= 7*24*60*60 {
throughputRateSecs = uint16(sevenDayThroughputRateSecs)
}
result, err := s.promClient.QueryDisperserAvgThroughputBlobSizeBytes(ctx, time.Unix(start, 0), time.Unix(end, 0), throughputRateSecs)
if err != nil {
return nil, err
}
Expand All @@ -89,7 +94,7 @@ func (s *server) getThroughput(ctx context.Context, start int64, end int64) ([]*
}

throughputs := make([]*Throughput, 0)
for i := avgThroughputWindowSize; i < len(result.Values); i++ {
for i := throughputRateSecs; i < uint16(len(result.Values)); i++ {
v := result.Values[i]
throughputs = append(throughputs, &Throughput{
Timestamp: uint64(v.Timestamp.Unix()),
Expand Down
13 changes: 4 additions & 9 deletions disperser/dataapi/prometheus_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ import (

const (
// maxNumOfDataPoints is the maximum number of data points that can be queried from Prometheus based on latency that this API can provide
maxNumOfDataPoints = 3500
throughputRateWindowInSec = 60
maxNumOfDataPoints = 3500
)

type (
PrometheusClient interface {
QueryDisperserBlobSizeBytesPerSecond(ctx context.Context, start time.Time, end time.Time) (*PrometheusResult, error)
QueryDisperserAvgThroughputBlobSizeBytes(ctx context.Context, start time.Time, end time.Time, windowSizeInSec uint8) (*PrometheusResult, error)
QueryDisperserAvgThroughputBlobSizeBytes(ctx context.Context, start time.Time, end time.Time, windowSizeInSec uint16) (*PrometheusResult, error)
}

PrometheusResultValues struct {
Expand Down Expand Up @@ -47,12 +46,8 @@ func (pc *prometheusClient) QueryDisperserBlobSizeBytesPerSecond(ctx context.Con
return pc.queryRange(ctx, query, start, end)
}

func (pc *prometheusClient) QueryDisperserAvgThroughputBlobSizeBytes(ctx context.Context, start time.Time, end time.Time, windowSizeInSec uint8) (*PrometheusResult, error) {
if windowSizeInSec < throughputRateWindowInSec {
windowSizeInSec = throughputRateWindowInSec
}

query := fmt.Sprintf("sum by (job) (rate(eigenda_batcher_blobs_total{state=\"confirmed\",data=\"size\",cluster=\"%s\"}[%ds]))", pc.cluster, windowSizeInSec)
func (pc *prometheusClient) QueryDisperserAvgThroughputBlobSizeBytes(ctx context.Context, start time.Time, end time.Time, throughputRateSecs uint16) (*PrometheusResult, error) {
query := fmt.Sprintf("sum by (job) (rate(eigenda_batcher_blobs_total{state=\"confirmed\",data=\"size\",cluster=\"%s\"}[%ds]))", pc.cluster, throughputRateSecs)
return pc.queryRange(ctx, query, start, end)
}

Expand Down
8 changes: 4 additions & 4 deletions disperser/dataapi/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,10 @@ func TestFetchMetricsThroughputHandler(t *testing.T) {
}

assert.Equal(t, http.StatusOK, res.StatusCode)
assert.Equal(t, 3481, len(response))
assert.Equal(t, float64(11666.666666666666), response[0].Throughput)
assert.Equal(t, uint64(1701292800), response[0].Timestamp)
assert.Equal(t, float64(3.599722666666646e+07), totalThroughput)
assert.Equal(t, 3361, len(response))
assert.Equal(t, float64(12000), response[0].Throughput)
assert.Equal(t, uint64(1701292920), response[0].Timestamp)
assert.Equal(t, float64(3.503022666666651e+07), totalThroughput)
}

func TestEjectOperatorHandler(t *testing.T) {
Expand Down

0 comments on commit 65b5725

Please sign in to comment.