diff --git a/disperser/batcher/metrics.go b/disperser/batcher/metrics.go index 8ccabcb004..b0762a1f96 100644 --- a/disperser/batcher/metrics.go +++ b/disperser/batcher/metrics.go @@ -7,6 +7,7 @@ import ( "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/disperser/common" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" @@ -370,7 +371,7 @@ func (e *EncodingStreamerMetrics) UpdateEncodedBlobs(count int, size uint64) { } func (e *EncodingStreamerMetrics) ObserveEncodingLatency(state string, quorumId core.QuorumID, blobSize int, latencyMs float64) { - e.BlobEncodingLatency.WithLabelValues(state, fmt.Sprintf("%d", quorumId), blobSizeBucket(blobSize)).Observe(latencyMs) + e.BlobEncodingLatency.WithLabelValues(state, fmt.Sprintf("%d", quorumId), common.BlobSizeBucket(blobSize)).Observe(latencyMs) } func (t *TxnManagerMetrics) ObserveLatency(stage string, latencyMs float64) { @@ -408,34 +409,3 @@ func (f *FinalizerMetrics) UpdateLastSeenFinalizedBlock(blockNumber uint64) { func (f *FinalizerMetrics) ObserveLatency(stage string, latencyMs float64) { f.Latency.WithLabelValues(stage).Observe(latencyMs) } - -// blobSizeBucket maps the blob size into a bucket that's defined according to -// the power of 2. -func blobSizeBucket(blobSize int) string { - switch { - case blobSize <= 32*1024: - return "32KiB" - case blobSize <= 64*1024: - return "64KiB" - case blobSize <= 128*1024: - return "128KiB" - case blobSize <= 256*1024: - return "256KiB" - case blobSize <= 512*1024: - return "512KiB" - case blobSize <= 1024*1024: - return "1MiB" - case blobSize <= 2*1024*1024: - return "2MiB" - case blobSize <= 4*1024*1024: - return "4MiB" - case blobSize <= 8*1024*1024: - return "8MiB" - case blobSize <= 16*1024*1024: - return "16MiB" - case blobSize <= 32*1024*1024: - return "32MiB" - default: - return "invalid" - } -} diff --git a/disperser/common/utils.go b/disperser/common/utils.go new file mode 100644 index 0000000000..5d2d4ca799 --- /dev/null +++ b/disperser/common/utils.go @@ -0,0 +1,42 @@ +package common + +// BlobSizeBucket maps the blob size into a bucket that's defined according to +// the power of 2. +func BlobSizeBucket(blobSize int) string { + switch { + case blobSize <= 1*1024: + return "1KiB" + case blobSize <= 2*1024: + return "2KiB" + case blobSize <= 4*1024: + return "4KiB" + case blobSize <= 8*1024: + return "8KiB" + case blobSize <= 16*1024: + return "16KiB" + case blobSize <= 32*1024: + return "32KiB" + case blobSize <= 64*1024: + return "64KiB" + case blobSize <= 128*1024: + return "128KiB" + case blobSize <= 256*1024: + return "256KiB" + case blobSize <= 512*1024: + return "512KiB" + case blobSize <= 1024*1024: + return "1MiB" + case blobSize <= 2*1024*1024: + return "2MiB" + case blobSize <= 4*1024*1024: + return "4MiB" + case blobSize <= 8*1024*1024: + return "8MiB" + case blobSize <= 16*1024*1024: + return "16MiB" + case blobSize <= 32*1024*1024: + return "32MiB" + default: + return "invalid" + } +} diff --git a/disperser/encoder/metrics.go b/disperser/encoder/metrics.go index 008cd16573..a68071bb90 100644 --- a/disperser/encoder/metrics.go +++ b/disperser/encoder/metrics.go @@ -6,6 +6,7 @@ import ( "net/http" "time" + "github.com/Layr-Labs/eigenda/disperser/common" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" @@ -26,6 +27,7 @@ type Metrics struct { NumEncodeBlobRequests *prometheus.CounterVec BlobSizeTotal *prometheus.CounterVec Latency *prometheus.SummaryVec + BlobQueue *prometheus.GaugeVec } func NewMetrics(httpPort string, logger logging.Logger) *Metrics { @@ -62,6 +64,14 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics { }, []string{"time"}, // time is either encoding or total ), + BlobQueue: promauto.With(reg).NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "eigenda_encoder", + Name: "blob_queue", + Help: "the number of blobs in the queue for encoding", + }, + []string{"size_bucket"}, + ), } } @@ -97,6 +107,12 @@ func (m *Metrics) ObserveLatency(stage string, duration time.Duration) { m.Latency.WithLabelValues(stage).Observe(float64(duration.Milliseconds())) } +func (m *Metrics) ObserveQueue(queueStats map[int]int) { + for blobSize, num := range queueStats { + m.BlobQueue.With(prometheus.Labels{"size_bucket": common.BlobSizeBucket(blobSize)}).Set(float64(num)) + } +} + func (m *Metrics) Start(ctx context.Context) { m.logger.Info("Starting metrics server at ", "port", m.httpPort) diff --git a/disperser/encoder/server.go b/disperser/encoder/server.go index 5d6a54a8c0..b02dd358d7 100644 --- a/disperser/encoder/server.go +++ b/disperser/encoder/server.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "net" + "sync" "time" "github.com/Layr-Labs/eigenda/common/healthcheck" @@ -27,7 +28,14 @@ type EncoderServer struct { close func() runningRequests chan struct{} - requestPool chan struct{} + requestPool chan blobRequest + + queueStats map[int]int + queueLock sync.Mutex +} + +type blobRequest struct { + blobSizeByte int } func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encoding.Prover, metrics *Metrics) *EncoderServer { @@ -38,7 +46,8 @@ func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encodin metrics: metrics, runningRequests: make(chan struct{}, config.MaxConcurrentRequests), - requestPool: make(chan struct{}, config.RequestPoolSize), + requestPool: make(chan blobRequest, config.RequestPoolSize), + queueStats: make(map[int]int), } } @@ -80,27 +89,33 @@ func (s *EncoderServer) Close() { func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobRequest) (*pb.EncodeBlobReply, error) { startTime := time.Now() + blobSize := len(req.GetData()) select { - case s.requestPool <- struct{}{}: + case s.requestPool <- blobRequest{blobSizeByte: blobSize}: default: s.metrics.IncrementRateLimitedBlobRequestNum(len(req.GetData())) s.logger.Warn("rate limiting as request pool is full", "requestPoolSize", s.config.RequestPoolSize, "maxConcurrentRequests", s.config.MaxConcurrentRequests) return nil, errors.New("too many requests") } + s.queueLock.Lock() + s.queueStats[blobSize]++ + s.metrics.ObserveQueue(s.queueStats) + s.queueLock.Unlock() + s.runningRequests <- struct{}{} defer s.popRequest() if ctx.Err() != nil { - s.metrics.IncrementCanceledBlobRequestNum(len(req.GetData())) + s.metrics.IncrementCanceledBlobRequestNum(blobSize) return nil, ctx.Err() } s.metrics.ObserveLatency("queuing", time.Since(startTime)) reply, err := s.handleEncoding(ctx, req) if err != nil { - s.metrics.IncrementFailedBlobRequestNum(len(req.GetData())) + s.metrics.IncrementFailedBlobRequestNum(blobSize) } else { - s.metrics.IncrementSuccessfulBlobRequestNum(len(req.GetData())) + s.metrics.IncrementSuccessfulBlobRequestNum(blobSize) } s.metrics.ObserveLatency("total", time.Since(startTime)) @@ -108,8 +123,12 @@ func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobReques } func (s *EncoderServer) popRequest() { - <-s.requestPool + blobRequest := <-s.requestPool <-s.runningRequests + s.queueLock.Lock() + s.queueStats[blobRequest.blobSizeByte]-- + s.metrics.ObserveQueue(s.queueStats) + s.queueLock.Unlock() } func (s *EncoderServer) handleEncoding(ctx context.Context, req *pb.EncodeBlobRequest) (*pb.EncodeBlobReply, error) {