Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Add more blob metadata mretrics" #324

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 13 additions & 76 deletions disperser/dataapi/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package dataapi

import (
"context"
"errors"
"fmt"
"net/http"
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/disperser"
Expand Down Expand Up @@ -37,7 +35,6 @@ func NewMetrics(blobMetadataStore *blobstore.BlobMetadataStore, httpPort string,
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())
reg.MustRegister(NewDynamoDBCollector(blobMetadataStore, logger))

metrics := &Metrics{
NumRequests: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -88,7 +85,6 @@ func (g *Metrics) IncrementFailedRequestNum(method string) {
func (g *Metrics) Start(ctx context.Context) {
g.logger.Info("Starting metrics server at ", "port", g.httpPort)
addr := fmt.Sprintf(":%s", g.httpPort)

go func() {
log := g.logger
mux := http.NewServeMux()
Expand All @@ -102,97 +98,38 @@ func (g *Metrics) Start(ctx context.Context) {
}

type DynamoDBCollector struct {
blobMetadataStore *blobstore.BlobMetadataStore
blobStatusMetric *prometheus.Desc
scrapeDurationMetric *prometheus.GaugeVec
logger common.Logger
blobMetadataStore *blobstore.BlobMetadataStore
blobStatusMetric *prometheus.Desc
logger common.Logger
}

func NewDynamoDBCollector(blobMetadataStore *blobstore.BlobMetadataStore, logger common.Logger) *DynamoDBCollector {
if blobMetadataStore == nil {
logger.Error("BlobMetadataStore is nil, metrics will not be collected")
}

collector := &DynamoDBCollector{
return &DynamoDBCollector{
blobMetadataStore: blobMetadataStore,
blobStatusMetric: prometheus.NewDesc("dynamodb_blob_metadata_status_count",
"Number of blobs with specific status in DynamoDB",
[]string{"status"},
nil,
),
scrapeDurationMetric: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "dynamodb_collector_scrape_duration_seconds",
Help: "Gauge of scrape duration for DynamoDB collector",
}, []string{}),
logger: logger,
}

return collector
}

func (collector *DynamoDBCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- collector.blobStatusMetric
collector.scrapeDurationMetric.Describe(ch)
}

func (collector *DynamoDBCollector) Collect(ch chan<- prometheus.Metric) {
// Record the start time of the scrape
startTime := time.Now()

for _, status := range []disperser.BlobStatus{
disperser.Processing,
disperser.Confirmed,
disperser.Failed,
disperser.InsufficientSignatures,
} {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

count, err := collector.getBlobMetadataByStatus(ctx, status)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
collector.logger.Error("Fetching blob metadata by status took longer than 60 seconds", "status", status)
} else {
collector.logger.Error("Failed to get count of blob metadata by status", "status", status, "err", err)
}
continue
}

ch <- prometheus.MustNewConstMetric(
collector.blobStatusMetric,
prometheus.GaugeValue,
float64(count),
status.String(),
)
}

// Record the scrape duration
duration := time.Since(startTime).Seconds()
collector.scrapeDurationMetric.WithLabelValues().Set(duration)
collector.scrapeDurationMetric.Collect(ch)
}

// getBlobMetadataByStatus fetches the count of blob metadata by status from DynamoDB.
// It uses pagination to fetch all the metadata by status and returns the total count.
func (collector *DynamoDBCollector) getBlobMetadataByStatus(ctx context.Context, status disperser.BlobStatus) (int, error) {
totalMetadata := 0

metadatas, exclusiveStartKey, err := collector.blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, status, 1000, nil)
count, err := collector.blobMetadataStore.GetBlobMetadataByStatusCount(context.Background(), disperser.Processing)
if err != nil {
collector.logger.Error("failed to get blob metadata by status with pagination", "status", status.String(), "err", err)
return 0, err
}
totalMetadata += len(metadatas) // Count the first batch of metadata

for exclusiveStartKey != nil {
metadatas, exclusiveStartKey, err = collector.blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, status, 1000, exclusiveStartKey)
if err != nil {
collector.logger.Error("failed to get blob metadata by status with pagination in loop", "status", status.String(), "err", err)
return totalMetadata, err
}

totalMetadata += len(metadatas)
collector.logger.Error("failed to get count of blob metadata by status", "err", err)
return
}

return totalMetadata, nil
ch <- prometheus.MustNewConstMetric(
collector.blobStatusMetric,
prometheus.GaugeValue,
float64(count),
disperser.Processing.String(),
)
}
Loading