Skip to content

Commit

Permalink
Revert "Add more blob metadata mretrics" (#324)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmanc authored Mar 7, 2024
1 parent 75e1ea8 commit 32e13d5
Showing 1 changed file with 13 additions and 76 deletions.
89 changes: 13 additions & 76 deletions disperser/dataapi/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package dataapi

import (
"context"
"errors"
"fmt"
"net/http"
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/disperser"
Expand Down Expand Up @@ -37,7 +35,6 @@ func NewMetrics(blobMetadataStore *blobstore.BlobMetadataStore, httpPort string,
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())
reg.MustRegister(NewDynamoDBCollector(blobMetadataStore, logger))

metrics := &Metrics{
NumRequests: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -88,7 +85,6 @@ func (g *Metrics) IncrementFailedRequestNum(method string) {
func (g *Metrics) Start(ctx context.Context) {
g.logger.Info("Starting metrics server at ", "port", g.httpPort)
addr := fmt.Sprintf(":%s", g.httpPort)

go func() {
log := g.logger
mux := http.NewServeMux()
Expand All @@ -102,97 +98,38 @@ func (g *Metrics) Start(ctx context.Context) {
}

type DynamoDBCollector struct {
blobMetadataStore *blobstore.BlobMetadataStore
blobStatusMetric *prometheus.Desc
scrapeDurationMetric *prometheus.GaugeVec
logger common.Logger
blobMetadataStore *blobstore.BlobMetadataStore
blobStatusMetric *prometheus.Desc
logger common.Logger
}

func NewDynamoDBCollector(blobMetadataStore *blobstore.BlobMetadataStore, logger common.Logger) *DynamoDBCollector {
if blobMetadataStore == nil {
logger.Error("BlobMetadataStore is nil, metrics will not be collected")
}

collector := &DynamoDBCollector{
return &DynamoDBCollector{
blobMetadataStore: blobMetadataStore,
blobStatusMetric: prometheus.NewDesc("dynamodb_blob_metadata_status_count",
"Number of blobs with specific status in DynamoDB",
[]string{"status"},
nil,
),
scrapeDurationMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "dynamodb_collector_scrape_duration_seconds",
Help: "Gauge of scrape duration for DynamoDB collector",
}, []string{}),
logger: logger,
}

return collector
}

func (collector *DynamoDBCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- collector.blobStatusMetric
collector.scrapeDurationMetric.Describe(ch)
}

func (collector *DynamoDBCollector) Collect(ch chan<- prometheus.Metric) {
// Record the start time of the scrape
startTime := time.Now()

for _, status := range []disperser.BlobStatus{
disperser.Processing,
disperser.Confirmed,
disperser.Failed,
disperser.InsufficientSignatures,
} {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

count, err := collector.getBlobMetadataByStatus(ctx, status)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
collector.logger.Error("Fetching blob metadata by status took longer than 60 seconds", "status", status)
} else {
collector.logger.Error("Failed to get count of blob metadata by status", "status", status, "err", err)
}
continue
}

ch <- prometheus.MustNewConstMetric(
collector.blobStatusMetric,
prometheus.GaugeValue,
float64(count),
status.String(),
)
}

// Record the scrape duration
duration := time.Since(startTime).Seconds()
collector.scrapeDurationMetric.WithLabelValues().Set(duration)
collector.scrapeDurationMetric.Collect(ch)
}

// getBlobMetadataByStatus fetches the count of blob metadata by status from DynamoDB.
// It uses pagination to fetch all the metadata by status and returns the total count.
func (collector *DynamoDBCollector) getBlobMetadataByStatus(ctx context.Context, status disperser.BlobStatus) (int, error) {
totalMetadata := 0

metadatas, exclusiveStartKey, err := collector.blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, status, 1000, nil)
count, err := collector.blobMetadataStore.GetBlobMetadataByStatusCount(context.Background(), disperser.Processing)
if err != nil {
collector.logger.Error("failed to get blob metadata by status with pagination", "status", status.String(), "err", err)
return 0, err
}
totalMetadata += len(metadatas) // Count the first batch of metadata

for exclusiveStartKey != nil {
metadatas, exclusiveStartKey, err = collector.blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, status, 1000, exclusiveStartKey)
if err != nil {
collector.logger.Error("failed to get blob metadata by status with pagination in loop", "status", status.String(), "err", err)
return totalMetadata, err
}

totalMetadata += len(metadatas)
collector.logger.Error("failed to get count of blob metadata by status", "err", err)
return
}

return totalMetadata, nil
ch <- prometheus.MustNewConstMetric(
collector.blobStatusMetric,
prometheus.GaugeValue,
float64(count),
disperser.Processing.String(),
)
}

0 comments on commit 32e13d5

Please sign in to comment.