Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics to v2 api server. #969

Merged
merged 3 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions disperser/apiserver/disperse_blob_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import (
)

func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBlobRequest) (*pb.DisperseBlobReply, error) {
start := time.Now()
defer func() {
s.metrics.reportDisperseBlobLatency(time.Since(start))
}()

onchainState := s.onchainState.Load()
if onchainState == nil {
return nil, api.NewErrorInternal("onchain state is nil")
Expand All @@ -25,6 +30,11 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl
return nil, err
}

finishedValidation := time.Now()
s.metrics.reportValidateDispersalRequestLatency(finishedValidation.Sub(start))

s.metrics.reportDisperseBlobSize(len(req.GetData()))

data := req.GetData()
blobHeader, err := corev2.BlobHeaderFromProtobuf(req.GetBlobHeader())
if err != nil {
Expand All @@ -37,6 +47,8 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl
return nil, err
}

s.metrics.reportStoreBlobLatency(time.Since(finishedValidation))

return &pb.DisperseBlobReply{
Result: dispv2.Queued.ToProfobuf(),
BlobKey: blobKey[:],
Expand Down
6 changes: 6 additions & 0 deletions disperser/apiserver/get_blob_status_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
Expand All @@ -13,6 +14,11 @@ import (
)

func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatusRequest) (*pb.BlobStatusReply, error) {
start := time.Now()
defer func() {
s.metrics.reportGetBlobStatusLatency(time.Since(start))
}()

if req.GetBlobKey() == nil || len(req.GetBlobKey()) != 32 {
return nil, api.NewErrorInvalidArg("invalid blob key")
}
Expand Down
145 changes: 145 additions & 0 deletions disperser/apiserver/metrics_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package apiserver

import (
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"time"
)

const namespace = "eigenda_disperser_api"

// metricsV2 encapsulates the metrics for the v2 API server.
type metricsV2 struct {
grpcServerOption grpc.ServerOption

getBlobCommitmentLatency *prometheus.SummaryVec
getPaymentStateLatency *prometheus.SummaryVec
disperseBlobLatency *prometheus.SummaryVec
disperseBlobSize *prometheus.GaugeVec
validateDispersalRequestLatency *prometheus.SummaryVec
storeBlobLatency *prometheus.SummaryVec
getBlobStatusLatency *prometheus.SummaryVec
}

// newAPIServerV2Metrics creates a new metricsV2 instance.
func newAPIServerV2Metrics(registry *prometheus.Registry) *metricsV2 {
grpcMetrics := grpcprom.NewServerMetrics()
registry.MustRegister(grpcMetrics)

grpcServerOption := grpc.UnaryInterceptor(
grpcMetrics.UnaryServerInterceptor(),
)

objectives := map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}

getBlobCommitmentLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "get_blob_commitment_latency_ms",
Help: "The time required to get the blob commitment.",
Objectives: objectives,
},
[]string{},
)

getPaymentStateLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "get_payment_state_latency_ms",
Help: "The time required to get the payment state.",
Objectives: objectives,
},
[]string{},
)

disperseBlobLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "disperse_blob_latency_ms",
Help: "The time required to disperse a blob.",
Objectives: objectives,
},
[]string{},
)

disperseBlobSize := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "disperse_blob_size_bytes",
Help: "The size of the blob in bytes.",
},
[]string{},
)

validateDispersalRequestLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "validate_dispersal_request_latency_ms",
Help: "The time required to validate a dispersal request.",
Objectives: objectives,
},
[]string{},
)

storeBlobLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "store_blob_latency_ms",
Help: "The time required to store a blob.",
Objectives: objectives,
},
[]string{},
)

getBlobStatusLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "get_blob_status_latency_ms",
Help: "The time required to get the blob status.",
Objectives: objectives,
},
[]string{},
)

return &metricsV2{
grpcServerOption: grpcServerOption,
getBlobCommitmentLatency: getBlobCommitmentLatency,
getPaymentStateLatency: getPaymentStateLatency,
disperseBlobLatency: disperseBlobLatency,
disperseBlobSize: disperseBlobSize,
validateDispersalRequestLatency: validateDispersalRequestLatency,
storeBlobLatency: storeBlobLatency,
getBlobStatusLatency: getBlobStatusLatency,
}
}

func (m *metricsV2) reportGetBlobCommitmentLatency(duration time.Duration) {
m.getBlobCommitmentLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}

func (m *metricsV2) reportGetPaymentStateLatency(duration time.Duration) {
m.getPaymentStateLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}

func (m *metricsV2) reportDisperseBlobLatency(duration time.Duration) {
m.disperseBlobLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}

func (m *metricsV2) reportDisperseBlobSize(size int) {
m.disperseBlobSize.WithLabelValues().Set(float64(size))
}

func (m *metricsV2) reportValidateDispersalRequestLatency(duration time.Duration) {
m.validateDispersalRequestLatency.WithLabelValues().Observe(
float64(duration.Nanoseconds()) / float64(time.Millisecond))
}

func (m *metricsV2) reportStoreBlobLatency(duration time.Duration) {
m.storeBlobLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}

func (m *metricsV2) reportGetBlobStatusLatency(duration time.Duration) {
m.getBlobStatusLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}
24 changes: 19 additions & 5 deletions disperser/apiserver/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"net"
"sync/atomic"
"time"
Expand All @@ -12,8 +13,7 @@ import (
pbcommon "github.com/Layr-Labs/eigenda/api/grpc/common"
pbv1 "github.com/Layr-Labs/eigenda/api/grpc/disperser"
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/common"
healthcheck "github.com/Layr-Labs/eigenda/common/healthcheck"
"github.com/Layr-Labs/eigenda/common/healthcheck"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/meterer"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
Expand Down Expand Up @@ -56,22 +56,23 @@ type DispersalServerV2 struct {
onchainState atomic.Pointer[OnchainState]
maxNumSymbolsPerBlob uint64
onchainStateRefreshInterval time.Duration

metrics *metricsV2
}

// NewDispersalServerV2 creates a new Server struct with the provided parameters.
func NewDispersalServerV2(
serverConfig disperser.ServerConfig,
rateConfig RateConfig,
blobStore *blobstore.BlobStore,
blobMetadataStore *blobstore.BlobMetadataStore,
chainReader core.Reader,
ratelimiter common.RateLimiter,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙇

meterer *meterer.Meterer,
authenticator corev2.BlobRequestAuthenticator,
prover encoding.Prover,
maxNumSymbolsPerBlob uint64,
onchainStateRefreshInterval time.Duration,
_logger logging.Logger,
registry *prometheus.Registry,
) *DispersalServerV2 {
logger := _logger.With("component", "DispersalServerV2")

Expand All @@ -88,6 +89,8 @@ func NewDispersalServerV2(

maxNumSymbolsPerBlob: maxNumSymbolsPerBlob,
onchainStateRefreshInterval: onchainStateRefreshInterval,

metrics: newAPIServerV2Metrics(registry),
}
}

Expand All @@ -101,7 +104,7 @@ func (s *DispersalServerV2) Start(ctx context.Context) error {

opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB

gs := grpc.NewServer(opt)
gs := grpc.NewServer(opt, s.metrics.grpcServerOption)
reflection.Register(gs)
pb.RegisterDisperserServer(gs, s)

Expand Down Expand Up @@ -142,6 +145,11 @@ func (s *DispersalServerV2) Start(ctx context.Context) error {
}

func (s *DispersalServerV2) GetBlobCommitment(ctx context.Context, req *pb.BlobCommitmentRequest) (*pb.BlobCommitmentReply, error) {
start := time.Now()
defer func() {
s.metrics.reportGetBlobCommitmentLatency(time.Since(start))
}()

if s.prover == nil {
return nil, api.NewErrorUnimplemented()
}
Expand Down Expand Up @@ -223,7 +231,13 @@ func (s *DispersalServerV2) RefreshOnchainState(ctx context.Context) error {
}

func (s *DispersalServerV2) GetPaymentState(ctx context.Context, req *pb.GetPaymentStateRequest) (*pb.GetPaymentStateReply, error) {
start := time.Now()
defer func() {
s.metrics.reportGetPaymentStateLatency(time.Since(start))
}()

accountID := gethcommon.HexToAddress(req.AccountId)

// validate the signature
if err := s.authenticator.AuthenticatePaymentStateRequest(req.GetSignature(), req.GetAccountId()); err != nil {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("authentication failed: %s", err.Error()))
Expand Down
21 changes: 16 additions & 5 deletions disperser/apiserver/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"math/big"
"net"
"testing"
Expand Down Expand Up @@ -430,7 +431,6 @@ func newTestServerV2(t *testing.T) *testComponents {
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, v2MetadataTableName)
blobStore := blobstore.NewBlobStore(s3BucketName, s3Client, logger)
chainReader := &mock.MockWriter{}
rateConfig := apiserver.RateConfig{}

// append test name to each table name for an unique store
mockState := &mock.MockOnchainPaymentState{}
Expand Down Expand Up @@ -491,10 +491,21 @@ func newTestServerV2(t *testing.T) *testComponents {
},
}, nil)

s := apiserver.NewDispersalServerV2(disperser.ServerConfig{
GrpcPort: "51002",
GrpcTimeout: 1 * time.Second,
}, rateConfig, blobStore, blobMetadataStore, chainReader, nil, meterer, auth.NewAuthenticator(), prover, 10, time.Hour, logger)
s := apiserver.NewDispersalServerV2(
disperser.ServerConfig{
GrpcPort: "51002",
GrpcTimeout: 1 * time.Second,
},
blobStore,
blobMetadataStore,
chainReader,
meterer,
auth.NewAuthenticator(),
prover,
10,
time.Hour,
logger,
prometheus.NewRegistry())

err = s.RefreshOnchainState(context.Background())
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions disperser/cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,16 @@ func RunDisperserServer(ctx *cli.Context) error {

server := apiserver.NewDispersalServerV2(
config.ServerConfig,
config.RateConfig,
blobStore,
blobMetadataStore,
transactor,
ratelimiter,
meterer,
authv2.NewAuthenticator(),
prover,
uint64(config.MaxNumSymbolsPerBlob),
config.OnchainStateRefreshInterval,
logger,
reg,
)
return server.Start(context.Background())
}
Expand Down Expand Up @@ -210,6 +209,7 @@ func RunDisperserServer(ctx *cli.Context) error {
// Enable Metrics Block
if config.MetricsConfig.EnableMetrics {
httpSocket := fmt.Sprintf(":%s", config.MetricsConfig.HTTPPort)
// TODO(cody-littley): once we deprecate v1, move all remaining metrics functionality to metrics_v2.go
metrics.Start(context.Background())
logger.Info("Enabled metrics for Disperser", "socket", httpSocket)
}
Expand Down
Loading