Skip to content

Commit

Permalink
Node metrics (#948)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Dec 16, 2024
1 parent 2d7a94e commit 80231c3
Show file tree
Hide file tree
Showing 18 changed files with 244 additions and 73 deletions.
7 changes: 7 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package common
import (
"bytes"
"crypto/sha256"
"time"
"unsafe"

"github.com/fxamacker/cbor/v2"
Expand Down Expand Up @@ -62,3 +63,9 @@ func DecodeFromBytes[T any](b []byte) (T, error) {
}
return t, nil
}

// ToMilliseconds converts the given duration to milliseconds. Unlike duration.Milliseconds(), this function returns
// a float64 with nanosecond precision (at least, as much precision as floating point numbers allow).
func ToMilliseconds(duration time.Duration) float64 {
return float64(duration.Nanoseconds()) / float64(time.Millisecond)
}
13 changes: 7 additions & 6 deletions disperser/apiserver/metrics_v2.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package apiserver

import (
"github.com/Layr-Labs/eigenda/common"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -116,15 +117,15 @@ func newAPIServerV2Metrics(registry *prometheus.Registry) *metricsV2 {
}

func (m *metricsV2) reportGetBlobCommitmentLatency(duration time.Duration) {
m.getBlobCommitmentLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.getBlobCommitmentLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *metricsV2) reportGetPaymentStateLatency(duration time.Duration) {
m.getPaymentStateLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.getPaymentStateLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *metricsV2) reportDisperseBlobLatency(duration time.Duration) {
m.disperseBlobLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.disperseBlobLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *metricsV2) reportDisperseBlobSize(size int) {
Expand All @@ -133,13 +134,13 @@ func (m *metricsV2) reportDisperseBlobSize(size int) {

func (m *metricsV2) reportValidateDispersalRequestLatency(duration time.Duration) {
m.validateDispersalRequestLatency.WithLabelValues().Observe(
float64(duration.Nanoseconds()) / float64(time.Millisecond))
common.ToMilliseconds(duration))
}

func (m *metricsV2) reportStoreBlobLatency(duration time.Duration) {
m.storeBlobLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.storeBlobLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *metricsV2) reportGetBlobStatusLatency(duration time.Duration) {
m.getBlobStatusLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.getBlobStatusLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}
37 changes: 19 additions & 18 deletions disperser/controller/dispatcher_metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"github.com/Layr-Labs/eigenda/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"time"
Expand Down Expand Up @@ -248,77 +249,77 @@ func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics {
}

func (m *dispatcherMetrics) reportHandleBatchLatency(duration time.Duration) {
m.handleBatchLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.handleBatchLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportNewBatchLatency(duration time.Duration) {
m.newBatchLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.newBatchLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportGetBlobMetadataLatency(duration time.Duration) {
m.getBlobMetadataLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.getBlobMetadataLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportGetOperatorStateLatency(duration time.Duration) {
m.getOperatorStateLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.getOperatorStateLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportGetBlobCertificatesLatency(duration time.Duration) {
m.getBlobCertificatesLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.getBlobCertificatesLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportBuildMerkleTreeLatency(duration time.Duration) {
m.buildMerkleTreeLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.buildMerkleTreeLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportPutBatchHeaderLatency(duration time.Duration) {
m.putBatchHeaderLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.putBatchHeaderLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportProofLatency(duration time.Duration) {
m.proofLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.proofLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportPutVerificationInfosLatency(duration time.Duration) {
m.putVerificationInfosLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.putVerificationInfosLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportPoolSubmissionLatency(duration time.Duration) {
m.poolSubmissionLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.poolSubmissionLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportPutDispersalRequestLatency(duration time.Duration) {
m.putDispersalRequestLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.putDispersalRequestLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportSendChunksLatency(duration time.Duration) {
m.sendChunksLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.sendChunksLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportSendChunksRetryCount(retries float64) {
m.sendChunksRetryCount.WithLabelValues().Set(retries)
}

func (m *dispatcherMetrics) reportPutDispersalResponseLatency(duration time.Duration) {
m.putDispersalResponseLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.putDispersalResponseLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportHandleSignaturesLatency(duration time.Duration) {
m.handleSignaturesLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.handleSignaturesLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportReceiveSignaturesLatency(duration time.Duration) {
m.receiveSignaturesLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.receiveSignaturesLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportAggregateSignaturesLatency(duration time.Duration) {
m.aggregateSignaturesLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.aggregateSignaturesLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportPutAttestationLatency(duration time.Duration) {
m.putAttestationLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.putAttestationLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportUpdateBatchStatusLatency(duration time.Duration) {
m.updateBatchStatusLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.updateBatchStatusLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}
11 changes: 6 additions & 5 deletions disperser/controller/encoding_manager_metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
common "github.com/Layr-Labs/eigenda/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"time"
Expand Down Expand Up @@ -123,23 +124,23 @@ func newEncodingManagerMetrics(registry *prometheus.Registry) *encodingManagerMe
}

func (m *encodingManagerMetrics) reportBatchSubmissionLatency(duration time.Duration) {
m.batchSubmissionLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.batchSubmissionLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *encodingManagerMetrics) reportBlobHandleLatency(duration time.Duration) {
m.blobHandleLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.blobHandleLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *encodingManagerMetrics) reportEncodingLatency(duration time.Duration) {
m.encodingLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.encodingLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *encodingManagerMetrics) reportPutBlobCertLatency(duration time.Duration) {
m.putBlobCertLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.putBlobCertLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *encodingManagerMetrics) reportUpdateBlobStatusLatency(duration time.Duration) {
m.updateBlobStatusLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
m.updateBlobStatusLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *encodingManagerMetrics) reportBatchSize(size int) {
Expand Down
8 changes: 7 additions & 1 deletion node/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,14 @@ func NodeMain(ctx *cli.Context) error {
}

// Creates the GRPC server.

// TODO(cody-littley): the metrics server is currently started by eigenmetrics, which is in another repo.
// When we fully remove v1 support, we need to start the metrics server inside the v2 metrics code.
server := nodegrpc.NewServer(config, node, logger, ratelimiter)
serverV2 := nodegrpc.NewServerV2(config, node, logger, ratelimiter)
serverV2, err := nodegrpc.NewServerV2(config, node, logger, ratelimiter, reg)
if err != nil {
return fmt.Errorf("failed to create server v2: %v", err)
}
err = nodegrpc.RunServers(server, serverV2, config, logger)

return err
Expand Down
4 changes: 2 additions & 2 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Config struct {
EnableNodeApi bool
NodeApiPort string
EnableMetrics bool
MetricsPort string
MetricsPort int
OnchainMetricsInterval int64
Timeout time.Duration
RegisterNodeAtStart bool
Expand Down Expand Up @@ -207,7 +207,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
EnableNodeApi: ctx.GlobalBool(flags.EnableNodeApiFlag.Name),
NodeApiPort: ctx.GlobalString(flags.NodeApiPortFlag.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name),
MetricsPort: ctx.GlobalString(flags.MetricsPortFlag.Name),
MetricsPort: ctx.GlobalInt(flags.MetricsPortFlag.Name),
OnchainMetricsInterval: ctx.GlobalInt64(flags.OnchainMetricsIntervalFlag.Name),
Timeout: timeout,
RegisterNodeAtStart: registerNodeAtStart,
Expand Down
4 changes: 2 additions & 2 deletions node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ var (
Required: true,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ENABLE_METRICS"),
}
MetricsPortFlag = cli.StringFlag{
MetricsPortFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "metrics-port"),
Usage: "Port at which node listens for metrics calls",
Required: false,
Value: "9091",
Value: 9091,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "METRICS_PORT"),
}
OnchainMetricsIntervalFlag = cli.StringFlag{
Expand Down
111 changes: 111 additions & 0 deletions node/grpc/metrics_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package grpc

import (
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigensdk-go/logging"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"time"
)

const namespace = "eigenda_node"

// MetricsV2 encapsulates metrics for the v2 DA node.
type MetricsV2 struct {
logger logging.Logger

registry *prometheus.Registry
grpcServerOption grpc.ServerOption

storeChunksLatency *prometheus.SummaryVec
storeChunksRequestSize *prometheus.GaugeVec

getChunksLatency *prometheus.SummaryVec
getChunksDataSize *prometheus.GaugeVec
}

// NewV2Metrics creates a new MetricsV2 instance. dbSizePollPeriod is the period at which the database size is polled.
// If set to 0, the database size is not polled.
func NewV2Metrics(logger logging.Logger, registry *prometheus.Registry) (*MetricsV2, error) {

// These should be re-enabled once the legacy v1 metrics are removed.
//registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
//registry.MustRegister(collectors.NewGoCollector())

grpcMetrics := grpcprom.NewServerMetrics()
registry.MustRegister(grpcMetrics)
grpcServerOption := grpc.UnaryInterceptor(
grpcMetrics.UnaryServerInterceptor(),
)

storeChunksLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "store_chunks_latency_ms",
Help: "The latency of a StoreChunks() RPC call.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{},
)

storeChunksRequestSize := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "store_chunks_request_size_bytes",
Help: "The size of the data requested to be stored by StoreChunks() RPC calls.",
},
[]string{},
)

getChunksLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "get_chunks_latency_ms",
Help: "The latency of a GetChunks() RPC call.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{},
)

getChunksDataSize := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "get_chunks_data_size_bytes",
Help: "The size of the data requested to be retrieved by GetChunks() RPC calls.",
},
[]string{},
)

return &MetricsV2{
logger: logger,
registry: registry,
grpcServerOption: grpcServerOption,
storeChunksLatency: storeChunksLatency,
storeChunksRequestSize: storeChunksRequestSize,
getChunksLatency: getChunksLatency,
getChunksDataSize: getChunksDataSize,
}, nil
}

// GetGRPCServerOption returns the gRPC server option that enables automatic GRPC metrics collection.
func (m *MetricsV2) GetGRPCServerOption() grpc.ServerOption {
return m.grpcServerOption
}

func (m *MetricsV2) ReportStoreChunksLatency(latency time.Duration) {
m.storeChunksLatency.WithLabelValues().Observe(common.ToMilliseconds(latency))
}

func (m *MetricsV2) ReportStoreChunksRequestSize(size uint64) {
m.storeChunksRequestSize.WithLabelValues().Set(float64(size))
}

func (m *MetricsV2) ReportGetChunksLatency(latency time.Duration) {
m.getChunksLatency.WithLabelValues().Observe(common.ToMilliseconds(latency))
}

func (m *MetricsV2) ReportGetChunksDataSize(size int) {
m.getChunksDataSize.WithLabelValues().Set(float64(size))
}
4 changes: 2 additions & 2 deletions node/grpc/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge
}

opt := grpc.MaxRecvMsgSize(60 * 1024 * 1024 * 1024) // 60 GiB
gs := grpc.NewServer(opt)
gs := grpc.NewServer(opt, serverV2.metrics.GetGRPCServerOption())

// Register reflection service on gRPC server
// This makes "grpcurl -plaintext localhost:9000 list" command work
Expand All @@ -60,7 +60,7 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge
}

opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB
gs := grpc.NewServer(opt)
gs := grpc.NewServer(opt, serverV2.metrics.GetGRPCServerOption())

// Register reflection service on gRPC server
// This makes "grpcurl -plaintext localhost:9000 list" command work
Expand Down
Loading

0 comments on commit 80231c3

Please sign in to comment.