Skip to content

Commit

Permalink
Revert changes to churner metrics. (#970)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Dec 9, 2024
1 parent cc34304 commit 3552a36
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 141 deletions.
5 changes: 0 additions & 5 deletions metrics.md

This file was deleted.

3 changes: 1 addition & 2 deletions operators/churner/churner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ func TestProcessChurnRequest(t *testing.T) {
NumRetries: numRetries,
},
}
metrics, err := churner.NewMetrics(9001, logger)
assert.NoError(t, err)
metrics := churner.NewMetrics("9001", logger)
cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger, metrics)
assert.NoError(t, err)
assert.NotNil(t, cn)
Expand Down
5 changes: 1 addition & 4 deletions operators/churner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,7 @@ func run(ctx *cli.Context) error {
logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint)
indexer := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger)

metrics, err := churner.NewMetrics(config.MetricsConfig.HTTPPort, logger)
if err != nil {
log.Fatalf("failed to create metrics: %v", err)
}
metrics := churner.NewMetrics(config.MetricsConfig.HTTPPort, logger)

cn, err := churner.NewChurner(config, indexer, tx, logger, metrics)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion operators/churner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
PerPublicKeyRateLimit: ctx.GlobalDuration(flags.PerPublicKeyRateLimit.Name),
ChurnApprovalInterval: ctx.GlobalDuration(flags.ChurnApprovalInterval.Name),
MetricsConfig: MetricsConfig{
HTTPPort: ctx.GlobalInt(flags.MetricsHTTPPort.Name),
HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetrics.Name),
},
}, nil
Expand Down
4 changes: 2 additions & 2 deletions operators/churner/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ var (
EnvVar: common.PrefixEnvVar(envPrefix, "ENABLE_METRICS"),
}
/* Optional Flags*/
MetricsHTTPPort = cli.IntFlag{
MetricsHTTPPort = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "metrics-http-port"),
Usage: "the http port which the metrics prometheus server is listening",
Required: false,
Value: 9100,
Value: "9100",
EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_HTTP_PORT"),
}
ChurnApprovalInterval = cli.DurationFlag{
Expand Down
33 changes: 0 additions & 33 deletions operators/churner/mdoc/churner-metrics.md

This file was deleted.

24 changes: 0 additions & 24 deletions operators/churner/mdoc/main.go

This file was deleted.

121 changes: 62 additions & 59 deletions operators/churner/metrics.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package churner

import (
"github.com/Layr-Labs/eigenda/common/metrics"
"time"
"context"
"fmt"
"net/http"

"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc/codes"
)

Expand All @@ -25,7 +28,7 @@ const (
)

// Note: statusCodeMap must be maintained in sync with failure reason constants.
var statusCodeMap = map[FailReason]string{
var statusCodeMap map[FailReason]string = map[FailReason]string{
FailReasonRateLimitExceeded: codes.ResourceExhausted.String(),
FailReasonInsufficientStakeToRegister: codes.InvalidArgument.String(),
FailReasonInsufficientStakeToChurn: codes.InvalidArgument.String(),
Expand All @@ -37,77 +40,63 @@ var statusCodeMap = map[FailReason]string{
}

type MetricsConfig struct {
HTTPPort int
HTTPPort string
EnableMetrics bool
}

type Metrics struct {
metricsServer metrics.Metrics
registry *prometheus.Registry

numRequests metrics.CountMetric
latency metrics.LatencyMetric
NumRequests *prometheus.CounterVec
Latency *prometheus.SummaryVec

logger logging.Logger
httpPort string
logger logging.Logger
}

type latencyLabel struct {
method string
}

type numRequestsLabel struct {
status string
method string
reason string
}

func NewMetrics(httpPort int, logger logging.Logger) (*Metrics, error) {
func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
namespace := "eigenda_churner"
reg := prometheus.NewRegistry()
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

metricsServer := metrics.NewMetrics(logger, "eigenda_churner", httpPort)

numRequests, err := metricsServer.NewCountMetric(
"request",
"the number of requests",
numRequestsLabel{})
if err != nil {
return nil, err
metrics := &Metrics{
NumRequests: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "requests",
Help: "the number of requests",
},
[]string{"status", "reason", "method"},
),
Latency: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "latency_ms",
Help: "latency summary in milliseconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
},
[]string{"method"},
),
registry: reg,
httpPort: httpPort,
logger: logger.With("component", "ChurnerMetrics"),
}

latency, err := metricsServer.NewLatencyMetric(
"latency",
"latency summary in milliseconds",
latencyLabel{},
&metrics.Quantile{Quantile: 0.5, Error: 0.05},
&metrics.Quantile{Quantile: 0.9, Error: 0.01},
&metrics.Quantile{Quantile: 0.95, Error: 0.01},
&metrics.Quantile{Quantile: 0.99, Error: 0.001})
if err != nil {
return nil, err
}

return &Metrics{
metricsServer: metricsServer,
numRequests: numRequests,
latency: latency,
logger: logger.With("component", "ChurnerMetrics"),
}, nil
return metrics
}

// WriteMetricsDocumentation writes the metrics for the churner to a markdown file.
func (g *Metrics) WriteMetricsDocumentation() error {
return g.metricsServer.WriteMetricsDocumentation("operators/churner/mdoc/churner-metrics.md")
}

// ObserveLatency observes the latency of a stage
func (g *Metrics) ObserveLatency(method string, latency time.Duration) {
g.latency.ReportLatency(latency, latencyLabel{method: method})
// ObserveLatency observes the latency of a stage in 'stage
func (g *Metrics) ObserveLatency(method string, latencyMs float64) {
g.Latency.WithLabelValues(method).Observe(latencyMs)
}

// IncrementSuccessfulRequestNum increments the number of successful requests
func (g *Metrics) IncrementSuccessfulRequestNum(method string) {
g.numRequests.Increment(numRequestsLabel{status: "success", method: method})
g.NumRequests.With(prometheus.Labels{
"status": "success",
"method": method,
"reason": "",
}).Inc()
}

// IncrementFailedRequestNum increments the number of failed requests
Expand All @@ -119,11 +108,25 @@ func (g *Metrics) IncrementFailedRequestNum(method string, reason FailReason) {
// handle a negligence of mapping from failure reason to status code.
code = codes.Internal.String()
}

g.numRequests.Increment(numRequestsLabel{status: code, reason: string(reason), method: method})
g.NumRequests.With(prometheus.Labels{
"status": code,
"reason": string(reason),
"method": method,
}).Inc()
}

// Start starts the metrics server
func (g *Metrics) Start() error {
return g.metricsServer.Start()
func (g *Metrics) Start(ctx context.Context) {
g.logger.Info("Starting metrics server at ", "port", g.httpPort)
addr := fmt.Sprintf(":%s", g.httpPort)
go func() {
log := g.logger
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(
g.registry,
promhttp.HandlerOpts{},
))
err := http.ListenAndServe(addr, mux)
log.Error("Prometheus server failed", "err", err)
}()
}
10 changes: 3 additions & 7 deletions operators/churner/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,8 @@ func NewServer(
func (s *Server) Start(metricsConfig MetricsConfig) error {
// Enable Metrics Block
if metricsConfig.EnableMetrics {
httpSocket := fmt.Sprintf(":%d", metricsConfig.HTTPPort)
err := s.metrics.Start()
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}

httpSocket := fmt.Sprintf(":%s", metricsConfig.HTTPPort)
s.metrics.Start(context.Background())
s.logger.Info("Enabled metrics for Churner", "socket", httpSocket)
}
return nil
Expand All @@ -66,7 +62,7 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl
}

timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("Churn", time.Duration(f*float64(time.Second)))
s.metrics.ObserveLatency("Churn", f*1000) // make milliseconds
}))
defer timer.ObserveDuration()
s.logger.Info("Received request: ", "QuorumIds", req.GetQuorumIds())
Expand Down
3 changes: 1 addition & 2 deletions operators/churner/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ func newTestServer(t *testing.T) *churner.Server {

setupMockWriter()

metrics, err := churner.NewMetrics(9001, logger)
assert.NoError(t, err)
metrics := churner.NewMetrics("9001", logger)
cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger, metrics)
if err != nil {
log.Fatalln("cannot create churner", err)
Expand Down
3 changes: 1 addition & 2 deletions operators/churner/tests/churner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ func newTestServer(t *testing.T) *churner.Server {
)
assert.NoError(t, err)

metrics, err := churner.NewMetrics(9001, logger)
assert.NoError(t, err)
metrics := churner.NewMetrics("9001", logger)
cn, err := churner.NewChurner(config, mockIndexer, operatorTransactorChurner, logger, metrics)
assert.NoError(t, err)

Expand Down

0 comments on commit 3552a36

Please sign in to comment.