diff --git a/disperser/cmd/dataapi/main.go b/disperser/cmd/dataapi/main.go index 5cf2be2234..7a693ad652 100644 --- a/disperser/cmd/dataapi/main.go +++ b/disperser/cmd/dataapi/main.go @@ -19,6 +19,7 @@ import ( "github.com/Layr-Labs/eigenda/disperser/dataapi" "github.com/Layr-Labs/eigenda/disperser/dataapi/prometheus" "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph" + "github.com/Layr-Labs/eigenda/operators/ejector" walletsdk "github.com/Layr-Labs/eigensdk-go/chainio/clients/wallet" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/Layr-Labs/eigensdk-go/signerv2" @@ -124,7 +125,7 @@ func RunDataApi(ctx *cli.Context) error { subgraphClient, tx, chainState, - dataapi.NewEjector(wallet, client, logger, tx, metrics, config.TxnTimeout, config.NonsigningRateThreshold), + ejector.NewEjector(wallet, client, logger, tx, metrics.EjectorMetrics, config.TxnTimeout, config.NonsigningRateThreshold), logger, metrics, nil, diff --git a/disperser/dataapi/metrics.go b/disperser/dataapi/metrics.go index 6cf82d5b20..c1f709deaa 100644 --- a/disperser/dataapi/metrics.go +++ b/disperser/dataapi/metrics.go @@ -7,12 +7,12 @@ import ( "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/disperser/common/blobstore" + "github.com/Layr-Labs/eigenda/operators/ejector" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" - "google.golang.org/grpc/codes" ) type MetricsConfig struct { @@ -23,14 +23,9 @@ type MetricsConfig struct { type Metrics struct { registry *prometheus.Registry - NumRequests *prometheus.CounterVec - Latency *prometheus.SummaryVec - - PeriodicEjectionRequests *prometheus.CounterVec - UrgentEjectionRequests *prometheus.CounterVec - OperatorsToEject *prometheus.CounterVec - StakeShareToEject *prometheus.GaugeVec - EjectionGasUsed prometheus.Gauge + NumRequests *prometheus.CounterVec + Latency *prometheus.SummaryVec + EjectorMetrics *ejector.Metrics httpPort string logger logging.Logger @@ -60,58 +55,10 @@ func NewMetrics(blobMetadataStore *blobstore.BlobMetadataStore, httpPort string, }, []string{"method"}, ), - // PeriodicEjectionRequests is a more detailed metric than NumRequests, specifically for - // tracking the ejection calls that are periodically initiated according to the SLA - // evaluation time window. - PeriodicEjectionRequests: promauto.With(reg).NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Name: "periodic_ejection_requests_total", - Help: "the total number of periodic ejection requests", - }, - []string{"status"}, - ), - // UrgentEjectionRequests is a more detailed metric than NumRequests, specifically for - // tracking the ejection calls that are urgently initiated due to bad network health - // condition. - UrgentEjectionRequests: promauto.With(reg).NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Name: "urgent_ejection_requests_total", - Help: "the total number of urgent ejection requests", - }, - []string{"status"}, - ), - // The number of operators requested to eject. Note this may be different than the - // actual number of operators ejected as EjectionManager contract may perform rate - // limiting. - OperatorsToEject: promauto.With(reg).NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Name: "operators_to_eject", - Help: "the total number of operators requested to eject", - }, []string{"quorum"}, - ), - // The total stake share requested to eject. Note this may be different than the - // actual stake share ejected as EjectionManager contract may perform rate limiting. - StakeShareToEject: promauto.With(reg).NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Name: "stake_share_to_eject", - Help: "the total stake share requested to eject", - }, []string{"quorum"}, - ), - // The gas used by EjectionManager contract for operator ejection. - EjectionGasUsed: promauto.With(reg).NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Name: "ejection_gas_used", - Help: "Gas used for operator ejection", - }, - ), - registry: reg, - httpPort: httpPort, - logger: logger.With("component", "DataAPIMetrics"), + EjectorMetrics: ejector.NewMetrics(reg, logger), + registry: reg, + httpPort: httpPort, + logger: logger.With("component", "DataAPIMetrics"), } return metrics } @@ -137,38 +84,6 @@ func (g *Metrics) IncrementFailedRequestNum(method string) { }).Inc() } -func (g *Metrics) IncrementEjectionRequest(mode string, status codes.Code) { - switch mode { - case "periodic": - g.PeriodicEjectionRequests.With(prometheus.Labels{ - "status": status.String(), - }).Inc() - case "urgent": - g.UrgentEjectionRequests.With(prometheus.Labels{ - "status": status.String(), - }).Inc() - } -} - -func (g *Metrics) UpdateRequestedOperatorMetric(numOperatorsByQuorum map[uint8]int, stakeShareByQuorum map[uint8]float64) { - for q, count := range numOperatorsByQuorum { - for i := 0; i < count; i++ { - g.OperatorsToEject.With(prometheus.Labels{ - "quorum": fmt.Sprintf("%d", q), - }).Inc() - } - } - for q, stakeShare := range stakeShareByQuorum { - g.StakeShareToEject.With(prometheus.Labels{ - "quorum": fmt.Sprintf("%d", q), - }).Set(stakeShare) - } -} - -func (g *Metrics) UpdateEjectionGasUsed(gasUsed uint64) { - g.EjectionGasUsed.Set(float64(gasUsed)) -} - // IncrementNotFoundRequestNum increments the number of not found requests func (g *Metrics) IncrementNotFoundRequestNum(method string) { g.NumRequests.With(prometheus.Labels{ diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index e17817b900..678aeb68bf 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -15,8 +15,8 @@ import ( "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/operators/ejector" "github.com/Layr-Labs/eigensdk-go/logging" - "google.golang.org/grpc/codes" "google.golang.org/grpc/health/grpc_health_v1" "github.com/Layr-Labs/eigenda/disperser" @@ -94,10 +94,6 @@ type ( Timestamp uint64 `json:"timestamp"` } - EjectionResponse struct { - TransactionHash string `json:"transaction_hash"` - } - Meta struct { Size int `json:"size"` } @@ -170,7 +166,7 @@ type ( subgraphClient SubgraphClient transactor core.Transactor chainState core.ChainState - ejector *Ejector + ejector *ejector.Ejector ejectionToken string metrics *Metrics @@ -189,7 +185,7 @@ func NewServer( subgraphClient SubgraphClient, transactor core.Transactor, chainState core.ChainState, - ejector *Ejector, + ejector *ejector.Ejector, logger logging.Logger, metrics *Metrics, grpcConn GRPCConn, @@ -344,9 +340,9 @@ func (s *server) EjectOperatorsHandler(c *gin.Context) { return } - mode := "periodic" - if c.Query("mode") != "" { - mode = c.Query("mode") + mode := ejector.PeriodicMode + if c.Query("mode") == "urgent" { + mode = ejector.UrgentMode } endTime := time.Now() @@ -355,7 +351,6 @@ func (s *server) EjectOperatorsHandler(c *gin.Context) { endTime, err = time.Parse("2006-01-02T15:04:05Z", c.Query("end")) if err != nil { s.metrics.IncrementFailedRequestNum("EjectOperators") - s.metrics.IncrementEjectionRequest(mode, codes.InvalidArgument) errorResponse(c, err) return } @@ -367,18 +362,27 @@ func (s *server) EjectOperatorsHandler(c *gin.Context) { } nonSigningRate, err := s.getOperatorNonsigningRate(c.Request.Context(), endTime.Unix()-interval, endTime.Unix(), true) - var ejectionResponse *EjectionResponse + var ejectionResponse *ejector.EjectionResponse if err == nil { - ejectionResponse, err = s.ejector.Eject(c.Request.Context(), nonSigningRate) + nonSigningMetrics := make([]*ejector.NonSignerMetric, 0) + for _, metric := range nonSigningRate.Data { + nonSigningMetrics = append(nonSigningMetrics, &ejector.NonSignerMetric{ + OperatorId: metric.OperatorId, + OperatorAddress: metric.OperatorAddress, + QuorumId: metric.QuorumId, + TotalUnsignedBatches: metric.TotalUnsignedBatches, + Percentage: metric.Percentage, + StakePercentage: metric.StakePercentage, + }) + } + ejectionResponse, err = s.ejector.Eject(c.Request.Context(), nonSigningMetrics, mode) } if err != nil { s.metrics.IncrementFailedRequestNum("EjectOperators") - s.metrics.IncrementEjectionRequest(mode, codes.Internal) errorResponse(c, err) return } s.metrics.IncrementSuccessfulRequestNum("EjectOperators") - s.metrics.IncrementEjectionRequest(mode, codes.OK) c.JSON(http.StatusOK, ejectionResponse) } diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index 49159dec80..8711a918ee 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -25,6 +25,7 @@ import ( "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph" subgraphmock "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph/mock" "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/operators/ejector" sdkmock "github.com/Layr-Labs/eigensdk-go/chainio/clients/mocks" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/consensys/gnark-crypto/ecc/bn254/fp" @@ -386,7 +387,7 @@ func TestEjectOperatorHandler(t *testing.T) { data, err := io.ReadAll(res.Body) assert.NoError(t, err) - var response dataapi.EjectionResponse + var response ejector.EjectionResponse err = json.Unmarshal(data, &response) assert.NoError(t, err) assert.NotNil(t, response) @@ -455,14 +456,14 @@ func TestFetchUnsignedBatchesHandler(t *testing.T) { type ejectorComponents struct { wallet *sdkmock.MockWallet ethClient *commonmock.MockEthClient - ejector *dataapi.Ejector + ejector *ejector.Ejector } func getEjector(t *testing.T) *ejectorComponents { ctrl := gomock.NewController(t) w := sdkmock.NewMockWallet(ctrl) ethClient := &commonmock.MockEthClient{} - ejector := dataapi.NewEjector(w, ethClient, mockLogger, mockTx, metrics, 100*time.Millisecond, -1) + ejector := ejector.NewEjector(w, ethClient, mockLogger, mockTx, metrics.EjectorMetrics, 100*time.Millisecond, -1) return &ejectorComponents{ wallet: w, ethClient: ethClient, diff --git a/disperser/dataapi/ejector.go b/operators/ejector/ejector.go similarity index 82% rename from disperser/dataapi/ejector.go rename to operators/ejector/ejector.go index 076b2ef88e..1559405b75 100644 --- a/disperser/dataapi/ejector.go +++ b/operators/ejector/ejector.go @@ -1,4 +1,4 @@ -package dataapi +package ejector import ( "context" @@ -16,6 +16,7 @@ import ( "github.com/Layr-Labs/eigensdk-go/logging" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" + "google.golang.org/grpc/codes" ) const ( @@ -23,6 +24,26 @@ const ( queryTickerDuration = 3 * time.Second ) +type EjectionResponse struct { + TransactionHash string `json:"transaction_hash"` +} + +type NonSignerMetric struct { + OperatorId string `json:"operator_id"` + OperatorAddress string `json:"operator_address"` + QuorumId uint8 `json:"quorum_id"` + TotalUnsignedBatches int `json:"total_unsigned_batches"` + Percentage float64 `json:"percentage"` + StakePercentage float64 `json:"stake_percentage"` +} + +type Mode string + +const ( + PeriodicMode Mode = "periodic" + UrgentMode Mode = "urgent" +) + // stakeShareToSLA returns the SLA for a given stake share in a quorum. // The caller should ensure "stakeShare" is in range (0, 1]. func stakeShareToSLA(stakeShare float64) float64 { @@ -49,7 +70,7 @@ func operatorPerfScore(stakeShare float64, nonsigningRate float64) float64 { return perf / (1.0 + perf) } -func computePerfScore(metric *OperatorNonsigningPercentageMetrics) float64 { +func computePerfScore(metric *NonSignerMetric) float64 { return operatorPerfScore(metric.StakePercentage, metric.Percentage) } @@ -78,12 +99,12 @@ func NewEjector(wallet walletsdk.Wallet, ethClient common.EthClient, logger logg } } -func (e *Ejector) Eject(ctx context.Context, nonsigningRate *OperatorsNonsigningPercentage) (*EjectionResponse, error) { +func (e *Ejector) Eject(ctx context.Context, nonsignerMetrics []*NonSignerMetric, mode Mode) (*EjectionResponse, error) { e.mu.Lock() defer e.mu.Unlock() - nonsigners := make([]*OperatorNonsigningPercentageMetrics, 0) - for _, metric := range nonsigningRate.Data { + nonsigners := make([]*NonSignerMetric, 0) + for _, metric := range nonsignerMetrics { // If nonsigningRateThreshold is set and valid, we will only eject operators with // nonsigning rate >= nonsigningRateThreshold. if e.nonsigningRateThreshold >= 10 && e.nonsigningRateThreshold <= 100 && metric.Percentage < float64(e.nonsigningRateThreshold) { @@ -110,11 +131,13 @@ func (e *Ejector) Eject(ctx context.Context, nonsigningRate *OperatorsNonsigning operatorsByQuorum, err := e.convertOperators(nonsigners) if err != nil { + e.metrics.IncrementEjectionRequest(mode, codes.Internal) return nil, err } txn, err := e.transactor.BuildEjectOperatorsTxn(ctx, operatorsByQuorum) if err != nil { + e.metrics.IncrementEjectionRequest(mode, codes.Internal) e.logger.Error("Failed to build ejection transaction", "err", err) return nil, err } @@ -124,11 +147,13 @@ func (e *Ejector) Eject(ctx context.Context, nonsigningRate *OperatorsNonsigning for retryFromFailure < maxSendTransactionRetry { gasTipCap, gasFeeCap, err := e.ethClient.GetLatestGasCaps(ctx) if err != nil { + e.metrics.IncrementEjectionRequest(mode, codes.Internal) return nil, fmt.Errorf("failed to get latest gas caps: %w", err) } txn, err = e.ethClient.UpdateGas(ctx, txn, big.NewInt(0), gasTipCap, gasFeeCap) if err != nil { + e.metrics.IncrementEjectionRequest(mode, codes.Internal) return nil, fmt.Errorf("failed to update gas price: %w", err) } txID, err = e.wallet.SendTransaction(ctx, txn) @@ -142,6 +167,7 @@ func (e *Ejector) Eject(ctx context.Context, nonsigningRate *OperatorsNonsigning retryFromFailure++ continue } else if err != nil { + e.metrics.IncrementEjectionRequest(mode, codes.Internal) return nil, fmt.Errorf("failed to send txn %s: %w", txn.Hash().Hex(), err) } else { e.logger.Debug("successfully sent txn", "txID", txID, "txHash", txn.Hash().Hex()) @@ -165,9 +191,11 @@ func (e *Ejector) Eject(ctx context.Context, nonsigningRate *OperatorsNonsigning } else if errors.Is(err, walletsdk.ErrNotYetBroadcasted) { e.logger.Warn("Transaction has not been broadcasted to network but attempted to retrieve receipt", "err", err) } else if errors.Is(err, walletsdk.ErrTransactionFailed) { + e.metrics.IncrementEjectionRequest(mode, codes.Internal) e.logger.Error("Transaction failed", "txID", txID, "txHash", txn.Hash().Hex(), "err", err) return nil, err } else { + e.metrics.IncrementEjectionRequest(mode, codes.Internal) e.logger.Error("Transaction receipt retrieval failed", "err", err) return nil, err } @@ -175,6 +203,7 @@ func (e *Ejector) Eject(ctx context.Context, nonsigningRate *OperatorsNonsigning // Wait for the next round. select { case <-ctxWithTimeout.Done(): + e.metrics.IncrementEjectionRequest(mode, codes.Internal) return nil, ctxWithTimeout.Err() case <-queryTicker.C: } @@ -189,10 +218,11 @@ func (e *Ejector) Eject(ctx context.Context, nonsigningRate *OperatorsNonsigning TransactionHash: receipt.TxHash.Hex(), } + e.metrics.IncrementEjectionRequest(mode, codes.OK) return ejectionResponse, nil } -func (e *Ejector) convertOperators(nonsigners []*OperatorNonsigningPercentageMetrics) ([][]core.OperatorID, error) { +func (e *Ejector) convertOperators(nonsigners []*NonSignerMetric) ([][]core.OperatorID, error) { var maxQuorumId uint8 for _, metric := range nonsigners { if metric.QuorumId > maxQuorumId { diff --git a/operators/ejector/metrics.go b/operators/ejector/metrics.go new file mode 100644 index 0000000000..151df76d1e --- /dev/null +++ b/operators/ejector/metrics.go @@ -0,0 +1,106 @@ +package ejector + +import ( + "fmt" + + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "google.golang.org/grpc/codes" +) + +type Metrics struct { + PeriodicEjectionRequests *prometheus.CounterVec + UrgentEjectionRequests *prometheus.CounterVec + OperatorsToEject *prometheus.CounterVec + StakeShareToEject *prometheus.GaugeVec + EjectionGasUsed prometheus.Gauge +} + +func NewMetrics(reg *prometheus.Registry, logger logging.Logger) *Metrics { + namespace := "eigenda_ejector" + metrics := &Metrics{ + // PeriodicEjectionRequests is a more detailed metric than NumRequests, specifically for + // tracking the ejection calls that are periodically initiated according to the SLA + // evaluation time window. + PeriodicEjectionRequests: promauto.With(reg).NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "periodic_ejection_requests_total", + Help: "the total number of periodic ejection requests", + }, + []string{"status"}, + ), + // UrgentEjectionRequests is a more detailed metric than NumRequests, specifically for + // tracking the ejection calls that are urgently initiated due to bad network health + // condition. + UrgentEjectionRequests: promauto.With(reg).NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "urgent_ejection_requests_total", + Help: "the total number of urgent ejection requests", + }, + []string{"status"}, + ), + // The number of operators requested to eject. Note this may be different than the + // actual number of operators ejected as EjectionManager contract may perform rate + // limiting. + OperatorsToEject: promauto.With(reg).NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "operators_to_eject", + Help: "the total number of operators requested to eject", + }, []string{"quorum"}, + ), + // The total stake share requested to eject. Note this may be different than the + // actual stake share ejected as EjectionManager contract may perform rate limiting. + StakeShareToEject: promauto.With(reg).NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "stake_share_to_eject", + Help: "the total stake share requested to eject", + }, []string{"quorum"}, + ), + // The gas used by EjectionManager contract for operator ejection. + EjectionGasUsed: promauto.With(reg).NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "ejection_gas_used", + Help: "Gas used for operator ejection", + }, + ), + } + return metrics +} + +func (g *Metrics) IncrementEjectionRequest(mode Mode, status codes.Code) { + switch mode { + case PeriodicMode: + g.PeriodicEjectionRequests.With(prometheus.Labels{ + "status": status.String(), + }).Inc() + case UrgentMode: + g.UrgentEjectionRequests.With(prometheus.Labels{ + "status": status.String(), + }).Inc() + } +} + +func (g *Metrics) UpdateEjectionGasUsed(gasUsed uint64) { + g.EjectionGasUsed.Set(float64(gasUsed)) +} + +func (g *Metrics) UpdateRequestedOperatorMetric(numOperatorsByQuorum map[uint8]int, stakeShareByQuorum map[uint8]float64) { + for q, count := range numOperatorsByQuorum { + for i := 0; i < count; i++ { + g.OperatorsToEject.With(prometheus.Labels{ + "quorum": fmt.Sprintf("%d", q), + }).Inc() + } + } + for q, stakeShare := range stakeShareByQuorum { + g.StakeShareToEject.With(prometheus.Labels{ + "quorum": fmt.Sprintf("%d", q), + }).Set(stakeShare) + } +}