Skip to content

Commit

Permalink
Add metrics to v2 api server. (#969)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Dec 11, 2024
1 parent 4190d49 commit 4a98d98
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 12 deletions.
12 changes: 12 additions & 0 deletions disperser/apiserver/disperse_blob_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import (
)

func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBlobRequest) (*pb.DisperseBlobReply, error) {
start := time.Now()
defer func() {
s.metrics.reportDisperseBlobLatency(time.Since(start))
}()

onchainState := s.onchainState.Load()
if onchainState == nil {
return nil, api.NewErrorInternal("onchain state is nil")
Expand All @@ -25,6 +30,11 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl
return nil, err
}

finishedValidation := time.Now()
s.metrics.reportValidateDispersalRequestLatency(finishedValidation.Sub(start))

s.metrics.reportDisperseBlobSize(len(req.GetData()))

data := req.GetData()
blobHeader, err := corev2.BlobHeaderFromProtobuf(req.GetBlobHeader())
if err != nil {
Expand All @@ -37,6 +47,8 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl
return nil, err
}

s.metrics.reportStoreBlobLatency(time.Since(finishedValidation))

return &pb.DisperseBlobReply{
Result: dispv2.Queued.ToProfobuf(),
BlobKey: blobKey[:],
Expand Down
6 changes: 6 additions & 0 deletions disperser/apiserver/get_blob_status_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
Expand All @@ -13,6 +14,11 @@ import (
)

func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatusRequest) (*pb.BlobStatusReply, error) {
start := time.Now()
defer func() {
s.metrics.reportGetBlobStatusLatency(time.Since(start))
}()

if req.GetBlobKey() == nil || len(req.GetBlobKey()) != 32 {
return nil, api.NewErrorInvalidArg("invalid blob key")
}
Expand Down
145 changes: 145 additions & 0 deletions disperser/apiserver/metrics_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package apiserver

import (
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"time"
)

const namespace = "eigenda_disperser_api"

// metricsV2 encapsulates the metrics for the v2 API server.
type metricsV2 struct {
grpcServerOption grpc.ServerOption

getBlobCommitmentLatency *prometheus.SummaryVec
getPaymentStateLatency *prometheus.SummaryVec
disperseBlobLatency *prometheus.SummaryVec
disperseBlobSize *prometheus.GaugeVec
validateDispersalRequestLatency *prometheus.SummaryVec
storeBlobLatency *prometheus.SummaryVec
getBlobStatusLatency *prometheus.SummaryVec
}

// newAPIServerV2Metrics creates a new metricsV2 instance.
func newAPIServerV2Metrics(registry *prometheus.Registry) *metricsV2 {
grpcMetrics := grpcprom.NewServerMetrics()
registry.MustRegister(grpcMetrics)

grpcServerOption := grpc.UnaryInterceptor(
grpcMetrics.UnaryServerInterceptor(),
)

objectives := map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}

getBlobCommitmentLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "get_blob_commitment_latency_ms",
Help: "The time required to get the blob commitment.",
Objectives: objectives,
},
[]string{},
)

getPaymentStateLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "get_payment_state_latency_ms",
Help: "The time required to get the payment state.",
Objectives: objectives,
},
[]string{},
)

disperseBlobLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "disperse_blob_latency_ms",
Help: "The time required to disperse a blob.",
Objectives: objectives,
},
[]string{},
)

disperseBlobSize := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "disperse_blob_size_bytes",
Help: "The size of the blob in bytes.",
},
[]string{},
)

validateDispersalRequestLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "validate_dispersal_request_latency_ms",
Help: "The time required to validate a dispersal request.",
Objectives: objectives,
},
[]string{},
)

storeBlobLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "store_blob_latency_ms",
Help: "The time required to store a blob.",
Objectives: objectives,
},
[]string{},
)

getBlobStatusLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "get_blob_status_latency_ms",
Help: "The time required to get the blob status.",
Objectives: objectives,
},
[]string{},
)

return &metricsV2{
grpcServerOption: grpcServerOption,
getBlobCommitmentLatency: getBlobCommitmentLatency,
getPaymentStateLatency: getPaymentStateLatency,
disperseBlobLatency: disperseBlobLatency,
disperseBlobSize: disperseBlobSize,
validateDispersalRequestLatency: validateDispersalRequestLatency,
storeBlobLatency: storeBlobLatency,
getBlobStatusLatency: getBlobStatusLatency,
}
}

func (m *metricsV2) reportGetBlobCommitmentLatency(duration time.Duration) {
m.getBlobCommitmentLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}

func (m *metricsV2) reportGetPaymentStateLatency(duration time.Duration) {
m.getPaymentStateLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}

func (m *metricsV2) reportDisperseBlobLatency(duration time.Duration) {
m.disperseBlobLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}

func (m *metricsV2) reportDisperseBlobSize(size int) {
m.disperseBlobSize.WithLabelValues().Set(float64(size))
}

func (m *metricsV2) reportValidateDispersalRequestLatency(duration time.Duration) {
m.validateDispersalRequestLatency.WithLabelValues().Observe(
float64(duration.Nanoseconds()) / float64(time.Millisecond))
}

func (m *metricsV2) reportStoreBlobLatency(duration time.Duration) {
m.storeBlobLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}

func (m *metricsV2) reportGetBlobStatusLatency(duration time.Duration) {
m.getBlobStatusLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}
24 changes: 19 additions & 5 deletions disperser/apiserver/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"net"
"sync/atomic"
"time"
Expand All @@ -12,8 +13,7 @@ import (
pbcommon "github.com/Layr-Labs/eigenda/api/grpc/common"
pbv1 "github.com/Layr-Labs/eigenda/api/grpc/disperser"
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/common"
healthcheck "github.com/Layr-Labs/eigenda/common/healthcheck"
"github.com/Layr-Labs/eigenda/common/healthcheck"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/meterer"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
Expand Down Expand Up @@ -56,22 +56,23 @@ type DispersalServerV2 struct {
onchainState atomic.Pointer[OnchainState]
maxNumSymbolsPerBlob uint64
onchainStateRefreshInterval time.Duration

metrics *metricsV2
}

// NewDispersalServerV2 creates a new Server struct with the provided parameters.
func NewDispersalServerV2(
serverConfig disperser.ServerConfig,
rateConfig RateConfig,
blobStore *blobstore.BlobStore,
blobMetadataStore *blobstore.BlobMetadataStore,
chainReader core.Reader,
ratelimiter common.RateLimiter,
meterer *meterer.Meterer,
authenticator corev2.BlobRequestAuthenticator,
prover encoding.Prover,
maxNumSymbolsPerBlob uint64,
onchainStateRefreshInterval time.Duration,
_logger logging.Logger,
registry *prometheus.Registry,
) *DispersalServerV2 {
logger := _logger.With("component", "DispersalServerV2")

Expand All @@ -88,6 +89,8 @@ func NewDispersalServerV2(

maxNumSymbolsPerBlob: maxNumSymbolsPerBlob,
onchainStateRefreshInterval: onchainStateRefreshInterval,

metrics: newAPIServerV2Metrics(registry),
}
}

Expand All @@ -101,7 +104,7 @@ func (s *DispersalServerV2) Start(ctx context.Context) error {

opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB

gs := grpc.NewServer(opt)
gs := grpc.NewServer(opt, s.metrics.grpcServerOption)
reflection.Register(gs)
pb.RegisterDisperserServer(gs, s)

Expand Down Expand Up @@ -142,6 +145,11 @@ func (s *DispersalServerV2) Start(ctx context.Context) error {
}

func (s *DispersalServerV2) GetBlobCommitment(ctx context.Context, req *pb.BlobCommitmentRequest) (*pb.BlobCommitmentReply, error) {
start := time.Now()
defer func() {
s.metrics.reportGetBlobCommitmentLatency(time.Since(start))
}()

if s.prover == nil {
return nil, api.NewErrorUnimplemented()
}
Expand Down Expand Up @@ -223,7 +231,13 @@ func (s *DispersalServerV2) RefreshOnchainState(ctx context.Context) error {
}

func (s *DispersalServerV2) GetPaymentState(ctx context.Context, req *pb.GetPaymentStateRequest) (*pb.GetPaymentStateReply, error) {
start := time.Now()
defer func() {
s.metrics.reportGetPaymentStateLatency(time.Since(start))
}()

accountID := gethcommon.HexToAddress(req.AccountId)

// validate the signature
if err := s.authenticator.AuthenticatePaymentStateRequest(req.GetSignature(), req.GetAccountId()); err != nil {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("authentication failed: %s", err.Error()))
Expand Down
21 changes: 16 additions & 5 deletions disperser/apiserver/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"math/big"
"net"
"testing"
Expand Down Expand Up @@ -430,7 +431,6 @@ func newTestServerV2(t *testing.T) *testComponents {
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, v2MetadataTableName)
blobStore := blobstore.NewBlobStore(s3BucketName, s3Client, logger)
chainReader := &mock.MockWriter{}
rateConfig := apiserver.RateConfig{}

// append test name to each table name for an unique store
mockState := &mock.MockOnchainPaymentState{}
Expand Down Expand Up @@ -491,10 +491,21 @@ func newTestServerV2(t *testing.T) *testComponents {
},
}, nil)

s := apiserver.NewDispersalServerV2(disperser.ServerConfig{
GrpcPort: "51002",
GrpcTimeout: 1 * time.Second,
}, rateConfig, blobStore, blobMetadataStore, chainReader, nil, meterer, auth.NewAuthenticator(), prover, 10, time.Hour, logger)
s := apiserver.NewDispersalServerV2(
disperser.ServerConfig{
GrpcPort: "51002",
GrpcTimeout: 1 * time.Second,
},
blobStore,
blobMetadataStore,
chainReader,
meterer,
auth.NewAuthenticator(),
prover,
10,
time.Hour,
logger,
prometheus.NewRegistry())

err = s.RefreshOnchainState(context.Background())
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions disperser/cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,16 @@ func RunDisperserServer(ctx *cli.Context) error {

server := apiserver.NewDispersalServerV2(
config.ServerConfig,
config.RateConfig,
blobStore,
blobMetadataStore,
transactor,
ratelimiter,
meterer,
authv2.NewAuthenticator(),
prover,
uint64(config.MaxNumSymbolsPerBlob),
config.OnchainStateRefreshInterval,
logger,
reg,
)
return server.Start(context.Background())
}
Expand Down Expand Up @@ -210,6 +209,7 @@ func RunDisperserServer(ctx *cli.Context) error {
// Enable Metrics Block
if config.MetricsConfig.EnableMetrics {
httpSocket := fmt.Sprintf(":%s", config.MetricsConfig.HTTPPort)
// TODO(cody-littley): once we deprecate v1, move all remaining metrics functionality to metrics_v2.go
metrics.Start(context.Background())
logger.Info("Enabled metrics for Disperser", "socket", httpSocket)
}
Expand Down

0 comments on commit 4a98d98

Please sign in to comment.