diff --git a/common/common.go b/common/common.go index 90baeb87e6..1105f0ca09 100644 --- a/common/common.go +++ b/common/common.go @@ -3,6 +3,7 @@ package common import ( "bytes" "crypto/sha256" + "time" "unsafe" "github.com/fxamacker/cbor/v2" @@ -62,3 +63,9 @@ func DecodeFromBytes[T any](b []byte) (T, error) { } return t, nil } + +// ToMilliseconds converts the given duration to milliseconds. Unlike duration.Milliseconds(), this function returns +// a float64 with nanosecond precision (at least, as much precision as floating point numbers allow). +func ToMilliseconds(duration time.Duration) float64 { + return float64(duration.Nanoseconds()) / float64(time.Millisecond) +} diff --git a/disperser/apiserver/metrics_v2.go b/disperser/apiserver/metrics_v2.go index 25f9e7fd3c..9273aa9d52 100644 --- a/disperser/apiserver/metrics_v2.go +++ b/disperser/apiserver/metrics_v2.go @@ -1,6 +1,7 @@ package apiserver import ( + "github.com/Layr-Labs/eigenda/common" grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -116,15 +117,15 @@ func newAPIServerV2Metrics(registry *prometheus.Registry) *metricsV2 { } func (m *metricsV2) reportGetBlobCommitmentLatency(duration time.Duration) { - m.getBlobCommitmentLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.getBlobCommitmentLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *metricsV2) reportGetPaymentStateLatency(duration time.Duration) { - m.getPaymentStateLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.getPaymentStateLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *metricsV2) reportDisperseBlobLatency(duration time.Duration) { - m.disperseBlobLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.disperseBlobLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *metricsV2) reportDisperseBlobSize(size int) { @@ -133,13 +134,13 @@ func (m *metricsV2) reportDisperseBlobSize(size int) { func (m *metricsV2) reportValidateDispersalRequestLatency(duration time.Duration) { m.validateDispersalRequestLatency.WithLabelValues().Observe( - float64(duration.Nanoseconds()) / float64(time.Millisecond)) + common.ToMilliseconds(duration)) } func (m *metricsV2) reportStoreBlobLatency(duration time.Duration) { - m.storeBlobLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.storeBlobLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *metricsV2) reportGetBlobStatusLatency(duration time.Duration) { - m.getBlobStatusLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.getBlobStatusLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } diff --git a/disperser/controller/dispatcher_metrics.go b/disperser/controller/dispatcher_metrics.go index f8a667a6e8..148342198f 100644 --- a/disperser/controller/dispatcher_metrics.go +++ b/disperser/controller/dispatcher_metrics.go @@ -1,6 +1,7 @@ package controller import ( + "github.com/Layr-Labs/eigenda/common" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "time" @@ -248,51 +249,51 @@ func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics { } func (m *dispatcherMetrics) reportHandleBatchLatency(duration time.Duration) { - m.handleBatchLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.handleBatchLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportNewBatchLatency(duration time.Duration) { - m.newBatchLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.newBatchLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportGetBlobMetadataLatency(duration time.Duration) { - m.getBlobMetadataLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.getBlobMetadataLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportGetOperatorStateLatency(duration time.Duration) { - m.getOperatorStateLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.getOperatorStateLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportGetBlobCertificatesLatency(duration time.Duration) { - m.getBlobCertificatesLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.getBlobCertificatesLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportBuildMerkleTreeLatency(duration time.Duration) { - m.buildMerkleTreeLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.buildMerkleTreeLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportPutBatchHeaderLatency(duration time.Duration) { - m.putBatchHeaderLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.putBatchHeaderLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportProofLatency(duration time.Duration) { - m.proofLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.proofLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportPutVerificationInfosLatency(duration time.Duration) { - m.putVerificationInfosLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.putVerificationInfosLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportPoolSubmissionLatency(duration time.Duration) { - m.poolSubmissionLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.poolSubmissionLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportPutDispersalRequestLatency(duration time.Duration) { - m.putDispersalRequestLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.putDispersalRequestLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportSendChunksLatency(duration time.Duration) { - m.sendChunksLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.sendChunksLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportSendChunksRetryCount(retries float64) { @@ -300,25 +301,25 @@ func (m *dispatcherMetrics) reportSendChunksRetryCount(retries float64) { } func (m *dispatcherMetrics) reportPutDispersalResponseLatency(duration time.Duration) { - m.putDispersalResponseLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.putDispersalResponseLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportHandleSignaturesLatency(duration time.Duration) { - m.handleSignaturesLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.handleSignaturesLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportReceiveSignaturesLatency(duration time.Duration) { - m.receiveSignaturesLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.receiveSignaturesLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportAggregateSignaturesLatency(duration time.Duration) { - m.aggregateSignaturesLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.aggregateSignaturesLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportPutAttestationLatency(duration time.Duration) { - m.putAttestationLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.putAttestationLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *dispatcherMetrics) reportUpdateBatchStatusLatency(duration time.Duration) { - m.updateBatchStatusLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.updateBatchStatusLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } diff --git a/disperser/controller/encoding_manager_metrics.go b/disperser/controller/encoding_manager_metrics.go index 28c34a0ed1..7911932039 100644 --- a/disperser/controller/encoding_manager_metrics.go +++ b/disperser/controller/encoding_manager_metrics.go @@ -1,6 +1,7 @@ package controller import ( + common "github.com/Layr-Labs/eigenda/common" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "time" @@ -123,23 +124,23 @@ func newEncodingManagerMetrics(registry *prometheus.Registry) *encodingManagerMe } func (m *encodingManagerMetrics) reportBatchSubmissionLatency(duration time.Duration) { - m.batchSubmissionLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.batchSubmissionLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *encodingManagerMetrics) reportBlobHandleLatency(duration time.Duration) { - m.blobHandleLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.blobHandleLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *encodingManagerMetrics) reportEncodingLatency(duration time.Duration) { - m.encodingLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.encodingLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *encodingManagerMetrics) reportPutBlobCertLatency(duration time.Duration) { - m.putBlobCertLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.putBlobCertLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *encodingManagerMetrics) reportUpdateBlobStatusLatency(duration time.Duration) { - m.updateBlobStatusLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.updateBlobStatusLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *encodingManagerMetrics) reportBatchSize(size int) { diff --git a/node/cmd/main.go b/node/cmd/main.go index 0225b33d71..a54e7bdfdb 100644 --- a/node/cmd/main.go +++ b/node/cmd/main.go @@ -85,8 +85,14 @@ func NodeMain(ctx *cli.Context) error { } // Creates the GRPC server. + + // TODO(cody-littley): the metrics server is currently started by eigenmetrics, which is in another repo. + // When we fully remove v1 support, we need to start the metrics server inside the v2 metrics code. server := nodegrpc.NewServer(config, node, logger, ratelimiter) - serverV2 := nodegrpc.NewServerV2(config, node, logger, ratelimiter) + serverV2, err := nodegrpc.NewServerV2(config, node, logger, ratelimiter, reg) + if err != nil { + return fmt.Errorf("failed to create server v2: %v", err) + } err = nodegrpc.RunServers(server, serverV2, config, logger) return err diff --git a/node/config.go b/node/config.go index fd8d530853..7bf051efdd 100644 --- a/node/config.go +++ b/node/config.go @@ -52,7 +52,7 @@ type Config struct { EnableNodeApi bool NodeApiPort string EnableMetrics bool - MetricsPort string + MetricsPort int OnchainMetricsInterval int64 Timeout time.Duration RegisterNodeAtStart bool @@ -207,7 +207,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) { EnableNodeApi: ctx.GlobalBool(flags.EnableNodeApiFlag.Name), NodeApiPort: ctx.GlobalString(flags.NodeApiPortFlag.Name), EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name), - MetricsPort: ctx.GlobalString(flags.MetricsPortFlag.Name), + MetricsPort: ctx.GlobalInt(flags.MetricsPortFlag.Name), OnchainMetricsInterval: ctx.GlobalInt64(flags.OnchainMetricsIntervalFlag.Name), Timeout: timeout, RegisterNodeAtStart: registerNodeAtStart, diff --git a/node/flags/flags.go b/node/flags/flags.go index ac0bbbae87..79e1e7feb6 100644 --- a/node/flags/flags.go +++ b/node/flags/flags.go @@ -66,11 +66,11 @@ var ( Required: true, EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ENABLE_METRICS"), } - MetricsPortFlag = cli.StringFlag{ + MetricsPortFlag = cli.IntFlag{ Name: common.PrefixFlag(FlagPrefix, "metrics-port"), Usage: "Port at which node listens for metrics calls", Required: false, - Value: "9091", + Value: 9091, EnvVar: common.PrefixEnvVar(EnvVarPrefix, "METRICS_PORT"), } OnchainMetricsIntervalFlag = cli.StringFlag{ diff --git a/node/grpc/metrics_v2.go b/node/grpc/metrics_v2.go new file mode 100644 index 0000000000..6dcfb52d4a --- /dev/null +++ b/node/grpc/metrics_v2.go @@ -0,0 +1,111 @@ +package grpc + +import ( + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigensdk-go/logging" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "google.golang.org/grpc" + "time" +) + +const namespace = "eigenda_node" + +// MetricsV2 encapsulates metrics for the v2 DA node. +type MetricsV2 struct { + logger logging.Logger + + registry *prometheus.Registry + grpcServerOption grpc.ServerOption + + storeChunksLatency *prometheus.SummaryVec + storeChunksRequestSize *prometheus.GaugeVec + + getChunksLatency *prometheus.SummaryVec + getChunksDataSize *prometheus.GaugeVec +} + +// NewV2Metrics creates a new MetricsV2 instance. dbSizePollPeriod is the period at which the database size is polled. +// If set to 0, the database size is not polled. +func NewV2Metrics(logger logging.Logger, registry *prometheus.Registry) (*MetricsV2, error) { + + // These should be re-enabled once the legacy v1 metrics are removed. + //registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + //registry.MustRegister(collectors.NewGoCollector()) + + grpcMetrics := grpcprom.NewServerMetrics() + registry.MustRegister(grpcMetrics) + grpcServerOption := grpc.UnaryInterceptor( + grpcMetrics.UnaryServerInterceptor(), + ) + + storeChunksLatency := promauto.With(registry).NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "store_chunks_latency_ms", + Help: "The latency of a StoreChunks() RPC call.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, + []string{}, + ) + + storeChunksRequestSize := promauto.With(registry).NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "store_chunks_request_size_bytes", + Help: "The size of the data requested to be stored by StoreChunks() RPC calls.", + }, + []string{}, + ) + + getChunksLatency := promauto.With(registry).NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "get_chunks_latency_ms", + Help: "The latency of a GetChunks() RPC call.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, + []string{}, + ) + + getChunksDataSize := promauto.With(registry).NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "get_chunks_data_size_bytes", + Help: "The size of the data requested to be retrieved by GetChunks() RPC calls.", + }, + []string{}, + ) + + return &MetricsV2{ + logger: logger, + registry: registry, + grpcServerOption: grpcServerOption, + storeChunksLatency: storeChunksLatency, + storeChunksRequestSize: storeChunksRequestSize, + getChunksLatency: getChunksLatency, + getChunksDataSize: getChunksDataSize, + }, nil +} + +// GetGRPCServerOption returns the gRPC server option that enables automatic GRPC metrics collection. +func (m *MetricsV2) GetGRPCServerOption() grpc.ServerOption { + return m.grpcServerOption +} + +func (m *MetricsV2) ReportStoreChunksLatency(latency time.Duration) { + m.storeChunksLatency.WithLabelValues().Observe(common.ToMilliseconds(latency)) +} + +func (m *MetricsV2) ReportStoreChunksRequestSize(size uint64) { + m.storeChunksRequestSize.WithLabelValues().Set(float64(size)) +} + +func (m *MetricsV2) ReportGetChunksLatency(latency time.Duration) { + m.getChunksLatency.WithLabelValues().Observe(common.ToMilliseconds(latency)) +} + +func (m *MetricsV2) ReportGetChunksDataSize(size int) { + m.getChunksDataSize.WithLabelValues().Set(float64(size)) +} diff --git a/node/grpc/run.go b/node/grpc/run.go index 5fea03d62c..cbcb2d74c6 100644 --- a/node/grpc/run.go +++ b/node/grpc/run.go @@ -33,7 +33,7 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge } opt := grpc.MaxRecvMsgSize(60 * 1024 * 1024 * 1024) // 60 GiB - gs := grpc.NewServer(opt) + gs := grpc.NewServer(opt, serverV2.metrics.GetGRPCServerOption()) // Register reflection service on gRPC server // This makes "grpcurl -plaintext localhost:9000 list" command work @@ -60,7 +60,7 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge } opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB - gs := grpc.NewServer(opt) + gs := grpc.NewServer(opt, serverV2.metrics.GetGRPCServerOption()) // Register reflection service on gRPC server // This makes "grpcurl -plaintext localhost:9000 list" command work diff --git a/node/grpc/server_v2.go b/node/grpc/server_v2.go index 4f46a70d53..620267cd02 100644 --- a/node/grpc/server_v2.go +++ b/node/grpc/server_v2.go @@ -4,8 +4,6 @@ import ( "context" "encoding/hex" "fmt" - "runtime" - "github.com/Layr-Labs/eigenda/api" pb "github.com/Layr-Labs/eigenda/api/grpc/node/v2" "github.com/Layr-Labs/eigenda/common" @@ -14,7 +12,10 @@ import ( corev2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/node" "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/prometheus/client_golang/prometheus" "github.com/shirou/gopsutil/mem" + "runtime" + "time" ) // ServerV2 implements the Node v2 proto APIs. @@ -26,6 +27,7 @@ type ServerV2 struct { node *node.Node ratelimiter common.RateLimiter logger logging.Logger + metrics *MetricsV2 } // NewServerV2 creates a new Server instance with the provided parameters. @@ -34,13 +36,20 @@ func NewServerV2( node *node.Node, logger logging.Logger, ratelimiter common.RateLimiter, -) *ServerV2 { + registry *prometheus.Registry) (*ServerV2, error) { + + metrics, err := NewV2Metrics(logger, registry) + if err != nil { + return nil, err + } + return &ServerV2{ config: config, node: node, ratelimiter: ratelimiter, logger: logger, - } + metrics: metrics, + }, nil } func (s *ServerV2) NodeInfo(ctx context.Context, in *pb.NodeInfoRequest) (*pb.NodeInfoReply, error) { @@ -58,6 +67,8 @@ func (s *ServerV2) NodeInfo(ctx context.Context, in *pb.NodeInfoRequest) (*pb.No } func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*pb.StoreChunksReply, error) { + start := time.Now() + if !s.config.EnableV2 { return nil, api.NewErrorInvalidArg("v2 API is disabled") } @@ -92,7 +103,7 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) ( } storeChan := make(chan storeResult) go func() { - keys, err := s.node.StoreV2.StoreBatch(batch, rawBundles) + keys, size, err := s.node.StoreV2.StoreBatch(batch, rawBundles) if err != nil { storeChan <- storeResult{ keys: nil, @@ -101,6 +112,8 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) ( return } + s.metrics.ReportStoreChunksRequestSize(size) + storeChan <- storeResult{ keys: keys, err: nil, @@ -124,6 +137,9 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) ( } sig := s.node.KeyPair.SignMessage(batchHeaderHash).Bytes() + + s.metrics.ReportStoreChunksLatency(time.Since(start)) + return &pb.StoreChunksReply{ Signature: sig[:], }, nil @@ -144,6 +160,8 @@ func (s *ServerV2) validateStoreChunksRequest(req *pb.StoreChunksRequest) (*core } func (s *ServerV2) GetChunks(ctx context.Context, in *pb.GetChunksRequest) (*pb.GetChunksReply, error) { + start := time.Now() + if !s.config.EnableV2 { return nil, api.NewErrorInvalidArg("v2 API is disabled") } @@ -166,6 +184,14 @@ func (s *ServerV2) GetChunks(ctx context.Context, in *pb.GetChunksRequest) (*pb. return nil, api.NewErrorInternal(fmt.Sprintf("failed to get chunks: %v", err)) } + size := 0 + if len(chunks) > 0 { + size = len(chunks[0]) * len(chunks) + } + s.metrics.ReportGetChunksDataSize(size) + + s.metrics.ReportGetChunksLatency(time.Since(start)) + return &pb.GetChunksReply{ Chunks: chunks, }, nil diff --git a/node/grpc/server_v2_test.go b/node/grpc/server_v2_test.go index 11ef1fce3e..cbfd9adcdf 100644 --- a/node/grpc/server_v2_test.go +++ b/node/grpc/server_v2_test.go @@ -82,7 +82,8 @@ func newTestComponents(t *testing.T, config *node.Config) *testComponents { RelayClient: atomicRelayClient, } node.BlobVersionParams.Store(v2.NewBlobVersionParameterMap(blobParamsMap)) - server := grpc.NewServerV2(config, node, logger, ratelimiter) + server, err := grpc.NewServerV2(config, node, logger, ratelimiter, prometheus.NewRegistry()) + require.NoError(t, err) return &testComponents{ server: server, node: node, diff --git a/node/mock/store_v2.go b/node/mock/store_v2.go index e7d473c3d4..a14a034ee0 100644 --- a/node/mock/store_v2.go +++ b/node/mock/store_v2.go @@ -19,12 +19,12 @@ func NewMockStoreV2() *MockStoreV2 { return &MockStoreV2{} } -func (m *MockStoreV2) StoreBatch(batch *corev2.Batch, rawBundles []*node.RawBundles) ([]kvstore.Key, error) { +func (m *MockStoreV2) StoreBatch(batch *corev2.Batch, rawBundles []*node.RawBundles) ([]kvstore.Key, uint64, error) { args := m.Called(batch, rawBundles) if args.Get(0) == nil { - return nil, args.Error(1) + return nil, 0, args.Error(1) } - return args.Get(0).([]kvstore.Key), args.Error(1) + return args.Get(0).([]kvstore.Key), 0, args.Error(1) } func (m *MockStoreV2) DeleteKeys(keys []kvstore.Key) error { diff --git a/node/node.go b/node/node.go index 3e36d5f787..155a585be3 100644 --- a/node/node.go +++ b/node/node.go @@ -104,7 +104,7 @@ func NewNode( nodeLogger := logger.With("component", "Node") - eigenMetrics := metrics.NewEigenMetrics(AppName, ":"+config.MetricsPort, reg, logger.With("component", "EigenMetrics")) + eigenMetrics := metrics.NewEigenMetrics(AppName, fmt.Sprintf(":%d", config.MetricsPort), reg, logger.With("component", "EigenMetrics")) rpcCallsCollector := rpccalls.NewCollector(AppName, reg) // Make sure config folder exists. @@ -176,7 +176,7 @@ func NewNode( // Setup Node Api nodeApi := nodeapi.NewNodeApi(AppName, SemVer, ":"+config.NodeApiPort, logger.With("component", "NodeApi")) - metrics := NewMetrics(eigenMetrics, reg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx, cst) + metrics := NewMetrics(eigenMetrics, reg, logger, fmt.Sprintf(":%d", config.MetricsPort), config.ID, config.OnchainMetricsInterval, tx, cst) // Make validator config.EncoderConfig.LoadG2Points = false diff --git a/node/store_v2.go b/node/store_v2.go index 82c46af83b..d7ff2c19fe 100644 --- a/node/store_v2.go +++ b/node/store_v2.go @@ -19,8 +19,19 @@ const ( ) type StoreV2 interface { - StoreBatch(batch *corev2.Batch, rawBundles []*RawBundles) ([]kvstore.Key, error) + + // StoreBatch stores a batch and its raw bundles in the database. Returns the keys of the stored data + // and the size of the stored data, in bytes. + // + // All modifications to the database within this method are performed atomically. + StoreBatch(batch *corev2.Batch, rawBundles []*RawBundles) ([]kvstore.Key, uint64, error) + + // DeleteKeys deletes the keys from local storage. + // + // All modifications to the database within this method are performed atomically. DeleteKeys(keys []kvstore.Key) error + + // GetChunks returns the chunks of a blob with the given blob key and quorum. GetChunks(blobKey corev2.BlobKey, quorum core.QuorumID) ([][]byte, error) } @@ -42,69 +53,72 @@ func NewLevelDBStoreV2(db kvstore.TableStore, logger logging.Logger, ttl time.Du } } -func (s *storeV2) StoreBatch(batch *corev2.Batch, rawBundles []*RawBundles) ([]kvstore.Key, error) { +func (s *storeV2) StoreBatch(batch *corev2.Batch, rawBundles []*RawBundles) ([]kvstore.Key, uint64, error) { if len(rawBundles) == 0 { - return nil, fmt.Errorf("no raw bundles") + return nil, 0, fmt.Errorf("no raw bundles") } if len(rawBundles) != len(batch.BlobCertificates) { - return nil, fmt.Errorf("mismatch between raw bundles (%d) and blob certificates (%d)", len(rawBundles), len(batch.BlobCertificates)) + return nil, 0, fmt.Errorf("mismatch between raw bundles (%d) and blob certificates (%d)", len(rawBundles), len(batch.BlobCertificates)) } dbBatch := s.db.NewTTLBatch() + var size uint64 keys := make([]kvstore.Key, 0) batchHeaderKeyBuilder, err := s.db.GetKeyBuilder(BatchHeaderTableName) if err != nil { - return nil, fmt.Errorf("failed to get key builder for batch header: %v", err) + return nil, 0, fmt.Errorf("failed to get key builder for batch header: %v", err) } batchHeaderHash, err := batch.BatchHeader.Hash() if err != nil { - return nil, fmt.Errorf("failed to hash batch header: %v", err) + return nil, 0, fmt.Errorf("failed to hash batch header: %v", err) } // Store batch header batchHeaderKey := batchHeaderKeyBuilder.Key(batchHeaderHash[:]) if _, err = s.db.Get(batchHeaderKey); err == nil { - return nil, ErrBatchAlreadyExist + return nil, 0, ErrBatchAlreadyExist } batchHeaderBytes, err := batch.BatchHeader.Serialize() if err != nil { - return nil, fmt.Errorf("failed to serialize batch header: %v", err) + return nil, 0, fmt.Errorf("failed to serialize batch header: %v", err) } keys = append(keys, batchHeaderKey) dbBatch.PutWithTTL(batchHeaderKey, batchHeaderBytes, s.ttl) + size += uint64(len(batchHeaderBytes)) // Store blob shards for _, bundles := range rawBundles { blobKey, err := bundles.BlobCertificate.BlobHeader.BlobKey() if err != nil { - return nil, fmt.Errorf("failed to get blob key: %v", err) + return nil, 0, fmt.Errorf("failed to get blob key: %v", err) } // Store bundles for quorum, bundle := range bundles.Bundles { bundlesKeyBuilder, err := s.db.GetKeyBuilder(BundleTableName) if err != nil { - return nil, fmt.Errorf("failed to get key builder for bundles: %v", err) + return nil, 0, fmt.Errorf("failed to get key builder for bundles: %v", err) } k, err := BundleKey(blobKey, quorum) if err != nil { - return nil, fmt.Errorf("failed to get key for bundles: %v", err) + return nil, 0, fmt.Errorf("failed to get key for bundles: %v", err) } keys = append(keys, bundlesKeyBuilder.Key(k)) dbBatch.PutWithTTL(bundlesKeyBuilder.Key(k), bundle, s.ttl) + size += uint64(len(bundle)) } } if err := dbBatch.Apply(); err != nil { - return nil, fmt.Errorf("failed to apply batch: %v", err) + return nil, 0, fmt.Errorf("failed to apply batch: %v", err) } - return keys, nil + return keys, size, nil } func (s *storeV2) DeleteKeys(keys []kvstore.Key) error { diff --git a/node/store_v2_test.go b/node/store_v2_test.go index 26ee3a3bc0..814c61c0de 100644 --- a/node/store_v2_test.go +++ b/node/store_v2_test.go @@ -42,7 +42,7 @@ func TestStoreBatchV2(t *testing.T) { defer func() { _ = db.Shutdown() }() - keys, err := s.StoreBatch(batch, rawBundles) + keys, _, err := s.StoreBatch(batch, rawBundles) require.NoError(t, err) require.Len(t, keys, 7) @@ -78,7 +78,7 @@ func TestStoreBatchV2(t *testing.T) { } // Try to store the same batch again - _, err = s.StoreBatch(batch, rawBundles) + _, _, err = s.StoreBatch(batch, rawBundles) require.ErrorIs(t, err, node.ErrBatchAlreadyExist) // Check deletion @@ -129,7 +129,7 @@ func TestGetChunks(t *testing.T) { defer func() { _ = db.Shutdown() }() - _, err := s.StoreBatch(batch, rawBundles) + _, _, err := s.StoreBatch(batch, rawBundles) require.NoError(t, err) chunks, err := s.GetChunks(blobKeys[0], 0) diff --git a/relay/cache/cache_accessor_metrics.go b/relay/cache/cache_accessor_metrics.go index e5ef456218..c873f25670 100644 --- a/relay/cache/cache_accessor_metrics.go +++ b/relay/cache/cache_accessor_metrics.go @@ -2,6 +2,7 @@ package cache import ( "fmt" + "github.com/Layr-Labs/eigenda/common" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "time" @@ -110,5 +111,5 @@ func (m *CacheAccessorMetrics) ReportAverageWeight(averageWeight float64) { } func (m *CacheAccessorMetrics) ReportCacheMissLatency(duration time.Duration) { - m.cacheMissLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.cacheMissLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } diff --git a/relay/metrics/metrics.go b/relay/metrics/metrics.go index faa19d1f75..702b7c7a04 100644 --- a/relay/metrics/metrics.go +++ b/relay/metrics/metrics.go @@ -2,6 +2,7 @@ package metrics import ( "fmt" + "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/relay/cache" "github.com/Layr-Labs/eigensdk-go/logging" grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" @@ -244,20 +245,19 @@ func (m *RelayMetrics) GetGRPCServerOption() grpc.ServerOption { } func (m *RelayMetrics) ReportChunkLatency(duration time.Duration) { - m.getChunksLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.getChunksLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *RelayMetrics) ReportChunkAuthenticationLatency(duration time.Duration) { - m.getChunksAuthenticationLatency.WithLabelValues().Observe( - float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.getChunksAuthenticationLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *RelayMetrics) ReportChunkMetadataLatency(duration time.Duration) { - m.getChunksMetadataLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.getChunksMetadataLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *RelayMetrics) ReportChunkDataLatency(duration time.Duration) { - m.getChunksDataLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.getChunksDataLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *RelayMetrics) ReportChunkAuthFailure() { @@ -277,15 +277,15 @@ func (m *RelayMetrics) ReportChunkDataSize(size int) { } func (m *RelayMetrics) ReportBlobLatency(duration time.Duration) { - m.getBlobLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.getBlobLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *RelayMetrics) ReportBlobMetadataLatency(duration time.Duration) { - m.getBlobMetadataLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.getBlobMetadataLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *RelayMetrics) ReportBlobDataLatency(duration time.Duration) { - m.getBlobDataLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) + m.getBlobDataLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } func (m *RelayMetrics) ReportBlobRateLimited(reason string) { diff --git a/test/integration_test.go b/test/integration_test.go index 5016f3598f..a1820bfe67 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "github.com/stretchr/testify/require" "log" "math" "math/big" @@ -418,7 +419,8 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging ratelimiter := &commonmock.NoopRatelimiter{} serverV1 := nodegrpc.NewServer(config, n, logger, ratelimiter) - serverV2 := nodegrpc.NewServerV2(config, n, logger, ratelimiter) + serverV2, err := nodegrpc.NewServerV2(config, n, logger, ratelimiter, prometheus.NewRegistry()) + require.NoError(t, err) ops[id] = TestOperator{ Node: n,