From 4a98d987891904f17f84a024ab782a9dd1b57f1f Mon Sep 17 00:00:00 2001 From: Cody Littley <56973212+cody-littley@users.noreply.github.com> Date: Wed, 11 Dec 2024 08:23:41 -0600 Subject: [PATCH] Add metrics to v2 api server. (#969) Signed-off-by: Cody Littley --- disperser/apiserver/disperse_blob_v2.go | 12 ++ disperser/apiserver/get_blob_status_v2.go | 6 + disperser/apiserver/metrics_v2.go | 145 ++++++++++++++++++++++ disperser/apiserver/server_v2.go | 24 +++- disperser/apiserver/server_v2_test.go | 21 +++- disperser/cmd/apiserver/main.go | 4 +- 6 files changed, 200 insertions(+), 12 deletions(-) create mode 100644 disperser/apiserver/metrics_v2.go diff --git a/disperser/apiserver/disperse_blob_v2.go b/disperser/apiserver/disperse_blob_v2.go index 01626ffb8c..230d2a82c0 100644 --- a/disperser/apiserver/disperse_blob_v2.go +++ b/disperser/apiserver/disperse_blob_v2.go @@ -16,6 +16,11 @@ import ( ) func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBlobRequest) (*pb.DisperseBlobReply, error) { + start := time.Now() + defer func() { + s.metrics.reportDisperseBlobLatency(time.Since(start)) + }() + onchainState := s.onchainState.Load() if onchainState == nil { return nil, api.NewErrorInternal("onchain state is nil") @@ -25,6 +30,11 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl return nil, err } + finishedValidation := time.Now() + s.metrics.reportValidateDispersalRequestLatency(finishedValidation.Sub(start)) + + s.metrics.reportDisperseBlobSize(len(req.GetData())) + data := req.GetData() blobHeader, err := corev2.BlobHeaderFromProtobuf(req.GetBlobHeader()) if err != nil { @@ -37,6 +47,8 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl return nil, err } + s.metrics.reportStoreBlobLatency(time.Since(finishedValidation)) + return &pb.DisperseBlobReply{ Result: dispv2.Queued.ToProfobuf(), BlobKey: blobKey[:], diff --git a/disperser/apiserver/get_blob_status_v2.go b/disperser/apiserver/get_blob_status_v2.go index 9717de65e1..e4d32475a8 100644 --- a/disperser/apiserver/get_blob_status_v2.go +++ b/disperser/apiserver/get_blob_status_v2.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/Layr-Labs/eigenda/api" pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2" @@ -13,6 +14,11 @@ import ( ) func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatusRequest) (*pb.BlobStatusReply, error) { + start := time.Now() + defer func() { + s.metrics.reportGetBlobStatusLatency(time.Since(start)) + }() + if req.GetBlobKey() == nil || len(req.GetBlobKey()) != 32 { return nil, api.NewErrorInvalidArg("invalid blob key") } diff --git a/disperser/apiserver/metrics_v2.go b/disperser/apiserver/metrics_v2.go new file mode 100644 index 0000000000..25f9e7fd3c --- /dev/null +++ b/disperser/apiserver/metrics_v2.go @@ -0,0 +1,145 @@ +package apiserver + +import ( + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "google.golang.org/grpc" + "time" +) + +const namespace = "eigenda_disperser_api" + +// metricsV2 encapsulates the metrics for the v2 API server. +type metricsV2 struct { + grpcServerOption grpc.ServerOption + + getBlobCommitmentLatency *prometheus.SummaryVec + getPaymentStateLatency *prometheus.SummaryVec + disperseBlobLatency *prometheus.SummaryVec + disperseBlobSize *prometheus.GaugeVec + validateDispersalRequestLatency *prometheus.SummaryVec + storeBlobLatency *prometheus.SummaryVec + getBlobStatusLatency *prometheus.SummaryVec +} + +// newAPIServerV2Metrics creates a new metricsV2 instance. +func newAPIServerV2Metrics(registry *prometheus.Registry) *metricsV2 { + grpcMetrics := grpcprom.NewServerMetrics() + registry.MustRegister(grpcMetrics) + + grpcServerOption := grpc.UnaryInterceptor( + grpcMetrics.UnaryServerInterceptor(), + ) + + objectives := map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001} + + getBlobCommitmentLatency := promauto.With(registry).NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "get_blob_commitment_latency_ms", + Help: "The time required to get the blob commitment.", + Objectives: objectives, + }, + []string{}, + ) + + getPaymentStateLatency := promauto.With(registry).NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "get_payment_state_latency_ms", + Help: "The time required to get the payment state.", + Objectives: objectives, + }, + []string{}, + ) + + disperseBlobLatency := promauto.With(registry).NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "disperse_blob_latency_ms", + Help: "The time required to disperse a blob.", + Objectives: objectives, + }, + []string{}, + ) + + disperseBlobSize := promauto.With(registry).NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "disperse_blob_size_bytes", + Help: "The size of the blob in bytes.", + }, + []string{}, + ) + + validateDispersalRequestLatency := promauto.With(registry).NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "validate_dispersal_request_latency_ms", + Help: "The time required to validate a dispersal request.", + Objectives: objectives, + }, + []string{}, + ) + + storeBlobLatency := promauto.With(registry).NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "store_blob_latency_ms", + Help: "The time required to store a blob.", + Objectives: objectives, + }, + []string{}, + ) + + getBlobStatusLatency := promauto.With(registry).NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "get_blob_status_latency_ms", + Help: "The time required to get the blob status.", + Objectives: objectives, + }, + []string{}, + ) + + return &metricsV2{ + grpcServerOption: grpcServerOption, + getBlobCommitmentLatency: getBlobCommitmentLatency, + getPaymentStateLatency: getPaymentStateLatency, + disperseBlobLatency: disperseBlobLatency, + disperseBlobSize: disperseBlobSize, + validateDispersalRequestLatency: validateDispersalRequestLatency, + storeBlobLatency: storeBlobLatency, + getBlobStatusLatency: getBlobStatusLatency, + } +} + +func (m *metricsV2) reportGetBlobCommitmentLatency(duration time.Duration) { + m.getBlobCommitmentLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) +} + +func (m *metricsV2) reportGetPaymentStateLatency(duration time.Duration) { + m.getPaymentStateLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) +} + +func (m *metricsV2) reportDisperseBlobLatency(duration time.Duration) { + m.disperseBlobLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) +} + +func (m *metricsV2) reportDisperseBlobSize(size int) { + m.disperseBlobSize.WithLabelValues().Set(float64(size)) +} + +func (m *metricsV2) reportValidateDispersalRequestLatency(duration time.Duration) { + m.validateDispersalRequestLatency.WithLabelValues().Observe( + float64(duration.Nanoseconds()) / float64(time.Millisecond)) +} + +func (m *metricsV2) reportStoreBlobLatency(duration time.Duration) { + m.storeBlobLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) +} + +func (m *metricsV2) reportGetBlobStatusLatency(duration time.Duration) { + m.getBlobStatusLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) +} diff --git a/disperser/apiserver/server_v2.go b/disperser/apiserver/server_v2.go index cb96f8b40f..877598a3f0 100644 --- a/disperser/apiserver/server_v2.go +++ b/disperser/apiserver/server_v2.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/prometheus/client_golang/prometheus" "net" "sync/atomic" "time" @@ -12,8 +13,7 @@ import ( pbcommon "github.com/Layr-Labs/eigenda/api/grpc/common" pbv1 "github.com/Layr-Labs/eigenda/api/grpc/disperser" pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2" - "github.com/Layr-Labs/eigenda/common" - healthcheck "github.com/Layr-Labs/eigenda/common/healthcheck" + "github.com/Layr-Labs/eigenda/common/healthcheck" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/meterer" corev2 "github.com/Layr-Labs/eigenda/core/v2" @@ -56,22 +56,23 @@ type DispersalServerV2 struct { onchainState atomic.Pointer[OnchainState] maxNumSymbolsPerBlob uint64 onchainStateRefreshInterval time.Duration + + metrics *metricsV2 } // NewDispersalServerV2 creates a new Server struct with the provided parameters. func NewDispersalServerV2( serverConfig disperser.ServerConfig, - rateConfig RateConfig, blobStore *blobstore.BlobStore, blobMetadataStore *blobstore.BlobMetadataStore, chainReader core.Reader, - ratelimiter common.RateLimiter, meterer *meterer.Meterer, authenticator corev2.BlobRequestAuthenticator, prover encoding.Prover, maxNumSymbolsPerBlob uint64, onchainStateRefreshInterval time.Duration, _logger logging.Logger, + registry *prometheus.Registry, ) *DispersalServerV2 { logger := _logger.With("component", "DispersalServerV2") @@ -88,6 +89,8 @@ func NewDispersalServerV2( maxNumSymbolsPerBlob: maxNumSymbolsPerBlob, onchainStateRefreshInterval: onchainStateRefreshInterval, + + metrics: newAPIServerV2Metrics(registry), } } @@ -101,7 +104,7 @@ func (s *DispersalServerV2) Start(ctx context.Context) error { opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB - gs := grpc.NewServer(opt) + gs := grpc.NewServer(opt, s.metrics.grpcServerOption) reflection.Register(gs) pb.RegisterDisperserServer(gs, s) @@ -142,6 +145,11 @@ func (s *DispersalServerV2) Start(ctx context.Context) error { } func (s *DispersalServerV2) GetBlobCommitment(ctx context.Context, req *pb.BlobCommitmentRequest) (*pb.BlobCommitmentReply, error) { + start := time.Now() + defer func() { + s.metrics.reportGetBlobCommitmentLatency(time.Since(start)) + }() + if s.prover == nil { return nil, api.NewErrorUnimplemented() } @@ -223,7 +231,13 @@ func (s *DispersalServerV2) RefreshOnchainState(ctx context.Context) error { } func (s *DispersalServerV2) GetPaymentState(ctx context.Context, req *pb.GetPaymentStateRequest) (*pb.GetPaymentStateReply, error) { + start := time.Now() + defer func() { + s.metrics.reportGetPaymentStateLatency(time.Since(start)) + }() + accountID := gethcommon.HexToAddress(req.AccountId) + // validate the signature if err := s.authenticator.AuthenticatePaymentStateRequest(req.GetSignature(), req.GetAccountId()); err != nil { return nil, api.NewErrorInvalidArg(fmt.Sprintf("authentication failed: %s", err.Error())) diff --git a/disperser/apiserver/server_v2_test.go b/disperser/apiserver/server_v2_test.go index d5f8262872..1d9d76e4e8 100644 --- a/disperser/apiserver/server_v2_test.go +++ b/disperser/apiserver/server_v2_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "fmt" + "github.com/prometheus/client_golang/prometheus" "math/big" "net" "testing" @@ -430,7 +431,6 @@ func newTestServerV2(t *testing.T) *testComponents { blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, v2MetadataTableName) blobStore := blobstore.NewBlobStore(s3BucketName, s3Client, logger) chainReader := &mock.MockWriter{} - rateConfig := apiserver.RateConfig{} // append test name to each table name for an unique store mockState := &mock.MockOnchainPaymentState{} @@ -491,10 +491,21 @@ func newTestServerV2(t *testing.T) *testComponents { }, }, nil) - s := apiserver.NewDispersalServerV2(disperser.ServerConfig{ - GrpcPort: "51002", - GrpcTimeout: 1 * time.Second, - }, rateConfig, blobStore, blobMetadataStore, chainReader, nil, meterer, auth.NewAuthenticator(), prover, 10, time.Hour, logger) + s := apiserver.NewDispersalServerV2( + disperser.ServerConfig{ + GrpcPort: "51002", + GrpcTimeout: 1 * time.Second, + }, + blobStore, + blobMetadataStore, + chainReader, + meterer, + auth.NewAuthenticator(), + prover, + 10, + time.Hour, + logger, + prometheus.NewRegistry()) err = s.RefreshOnchainState(context.Background()) assert.NoError(t, err) diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index 9a5752b6a7..e28e3aeb79 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -172,17 +172,16 @@ func RunDisperserServer(ctx *cli.Context) error { server := apiserver.NewDispersalServerV2( config.ServerConfig, - config.RateConfig, blobStore, blobMetadataStore, transactor, - ratelimiter, meterer, authv2.NewAuthenticator(), prover, uint64(config.MaxNumSymbolsPerBlob), config.OnchainStateRefreshInterval, logger, + reg, ) return server.Start(context.Background()) } @@ -210,6 +209,7 @@ func RunDisperserServer(ctx *cli.Context) error { // Enable Metrics Block if config.MetricsConfig.EnableMetrics { httpSocket := fmt.Sprintf(":%s", config.MetricsConfig.HTTPPort) + // TODO(cody-littley): once we deprecate v1, move all remaining metrics functionality to metrics_v2.go metrics.Start(context.Background()) logger.Info("Enabled metrics for Disperser", "socket", httpSocket) }