Skip to content

Commit

Permalink
Add encoder queueing stats for autoscaling (#910)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Nov 19, 2024
1 parent afd5894 commit cf32712
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 39 deletions.
34 changes: 2 additions & 32 deletions disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/common"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
Expand Down Expand Up @@ -370,7 +371,7 @@ func (e *EncodingStreamerMetrics) UpdateEncodedBlobs(count int, size uint64) {
}

func (e *EncodingStreamerMetrics) ObserveEncodingLatency(state string, quorumId core.QuorumID, blobSize int, latencyMs float64) {
e.BlobEncodingLatency.WithLabelValues(state, fmt.Sprintf("%d", quorumId), blobSizeBucket(blobSize)).Observe(latencyMs)
e.BlobEncodingLatency.WithLabelValues(state, fmt.Sprintf("%d", quorumId), common.BlobSizeBucket(blobSize)).Observe(latencyMs)
}

func (t *TxnManagerMetrics) ObserveLatency(stage string, latencyMs float64) {
Expand Down Expand Up @@ -408,34 +409,3 @@ func (f *FinalizerMetrics) UpdateLastSeenFinalizedBlock(blockNumber uint64) {
func (f *FinalizerMetrics) ObserveLatency(stage string, latencyMs float64) {
f.Latency.WithLabelValues(stage).Observe(latencyMs)
}

// blobSizeBucket maps the blob size into a bucket that's defined according to
// the power of 2.
func blobSizeBucket(blobSize int) string {
switch {
case blobSize <= 32*1024:
return "32KiB"
case blobSize <= 64*1024:
return "64KiB"
case blobSize <= 128*1024:
return "128KiB"
case blobSize <= 256*1024:
return "256KiB"
case blobSize <= 512*1024:
return "512KiB"
case blobSize <= 1024*1024:
return "1MiB"
case blobSize <= 2*1024*1024:
return "2MiB"
case blobSize <= 4*1024*1024:
return "4MiB"
case blobSize <= 8*1024*1024:
return "8MiB"
case blobSize <= 16*1024*1024:
return "16MiB"
case blobSize <= 32*1024*1024:
return "32MiB"
default:
return "invalid"
}
}
42 changes: 42 additions & 0 deletions disperser/common/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package common

// BlobSizeBucket maps the blob size into a bucket that's defined according to
// the power of 2.
func BlobSizeBucket(blobSize int) string {
switch {
case blobSize <= 1*1024:
return "1KiB"
case blobSize <= 2*1024:
return "2KiB"
case blobSize <= 4*1024:
return "4KiB"
case blobSize <= 8*1024:
return "8KiB"
case blobSize <= 16*1024:
return "16KiB"
case blobSize <= 32*1024:
return "32KiB"
case blobSize <= 64*1024:
return "64KiB"
case blobSize <= 128*1024:
return "128KiB"
case blobSize <= 256*1024:
return "256KiB"
case blobSize <= 512*1024:
return "512KiB"
case blobSize <= 1024*1024:
return "1MiB"
case blobSize <= 2*1024*1024:
return "2MiB"
case blobSize <= 4*1024*1024:
return "4MiB"
case blobSize <= 8*1024*1024:
return "8MiB"
case blobSize <= 16*1024*1024:
return "16MiB"
case blobSize <= 32*1024*1024:
return "32MiB"
default:
return "invalid"
}
}
16 changes: 16 additions & 0 deletions disperser/encoder/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"time"

"github.com/Layr-Labs/eigenda/disperser/common"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
Expand All @@ -26,6 +27,7 @@ type Metrics struct {
NumEncodeBlobRequests *prometheus.CounterVec
BlobSizeTotal *prometheus.CounterVec
Latency *prometheus.SummaryVec
BlobQueue *prometheus.GaugeVec
}

func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
Expand Down Expand Up @@ -62,6 +64,14 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
},
[]string{"time"}, // time is either encoding or total
),
BlobQueue: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "eigenda_encoder",
Name: "blob_queue",
Help: "the number of blobs in the queue for encoding",
},
[]string{"size_bucket"},
),
}
}

Expand Down Expand Up @@ -97,6 +107,12 @@ func (m *Metrics) ObserveLatency(stage string, duration time.Duration) {
m.Latency.WithLabelValues(stage).Observe(float64(duration.Milliseconds()))
}

func (m *Metrics) ObserveQueue(queueStats map[int]int) {
for blobSize, num := range queueStats {
m.BlobQueue.With(prometheus.Labels{"size_bucket": common.BlobSizeBucket(blobSize)}).Set(float64(num))
}
}

func (m *Metrics) Start(ctx context.Context) {
m.logger.Info("Starting metrics server at ", "port", m.httpPort)

Expand Down
33 changes: 26 additions & 7 deletions disperser/encoder/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"net"
"sync"
"time"

"github.com/Layr-Labs/eigenda/common/healthcheck"
Expand All @@ -27,7 +28,14 @@ type EncoderServer struct {
close func()

runningRequests chan struct{}
requestPool chan struct{}
requestPool chan blobRequest

queueStats map[int]int
queueLock sync.Mutex
}

type blobRequest struct {
blobSizeByte int
}

func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encoding.Prover, metrics *Metrics) *EncoderServer {
Expand All @@ -38,7 +46,8 @@ func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encodin
metrics: metrics,

runningRequests: make(chan struct{}, config.MaxConcurrentRequests),
requestPool: make(chan struct{}, config.RequestPoolSize),
requestPool: make(chan blobRequest, config.RequestPoolSize),
queueStats: make(map[int]int),
}
}

Expand Down Expand Up @@ -80,36 +89,46 @@ func (s *EncoderServer) Close() {

func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobRequest) (*pb.EncodeBlobReply, error) {
startTime := time.Now()
blobSize := len(req.GetData())
select {
case s.requestPool <- struct{}{}:
case s.requestPool <- blobRequest{blobSizeByte: blobSize}:
default:
s.metrics.IncrementRateLimitedBlobRequestNum(len(req.GetData()))
s.logger.Warn("rate limiting as request pool is full", "requestPoolSize", s.config.RequestPoolSize, "maxConcurrentRequests", s.config.MaxConcurrentRequests)
return nil, errors.New("too many requests")
}
s.queueLock.Lock()
s.queueStats[blobSize]++
s.metrics.ObserveQueue(s.queueStats)
s.queueLock.Unlock()

s.runningRequests <- struct{}{}
defer s.popRequest()

if ctx.Err() != nil {
s.metrics.IncrementCanceledBlobRequestNum(len(req.GetData()))
s.metrics.IncrementCanceledBlobRequestNum(blobSize)
return nil, ctx.Err()
}

s.metrics.ObserveLatency("queuing", time.Since(startTime))
reply, err := s.handleEncoding(ctx, req)
if err != nil {
s.metrics.IncrementFailedBlobRequestNum(len(req.GetData()))
s.metrics.IncrementFailedBlobRequestNum(blobSize)
} else {
s.metrics.IncrementSuccessfulBlobRequestNum(len(req.GetData()))
s.metrics.IncrementSuccessfulBlobRequestNum(blobSize)
}
s.metrics.ObserveLatency("total", time.Since(startTime))

return reply, err
}

func (s *EncoderServer) popRequest() {
<-s.requestPool
blobRequest := <-s.requestPool
<-s.runningRequests
s.queueLock.Lock()
s.queueStats[blobRequest.blobSizeByte]--
s.metrics.ObserveQueue(s.queueStats)
s.queueLock.Unlock()
}

func (s *EncoderServer) handleEncoding(ctx context.Context, req *pb.EncodeBlobRequest) (*pb.EncodeBlobReply, error) {
Expand Down

0 comments on commit cf32712

Please sign in to comment.