From 952b6dfa812f9e1a324bdf48c4bc20589ccb707d Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Fri, 26 Apr 2024 11:20:32 -0700 Subject: [PATCH] integrate fireblocks wallet into ejector --- core/eth/tx.go | 14 +--- core/mock/tx.go | 4 +- core/tx.go | 4 +- disperser/cmd/dataapi/config.go | 10 +++ disperser/cmd/dataapi/flags/flags.go | 19 ++++++ disperser/cmd/dataapi/main.go | 80 +++++++++++++++++++++++ disperser/dataapi/ejector.go | 96 ++++++++++++++++++++++++++-- disperser/dataapi/server.go | 7 +- disperser/dataapi/server_test.go | 78 +++++++++++++++------- 9 files changed, 262 insertions(+), 50 deletions(-) diff --git a/core/eth/tx.go b/core/eth/tx.go index 57cdea0597..e564b59cc7 100644 --- a/core/eth/tx.go +++ b/core/eth/tx.go @@ -364,7 +364,7 @@ func (t *Transactor) UpdateOperatorSocket(ctx context.Context, socket string) er return nil } -func (t *Transactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]core.OperatorID) (*types.Receipt, error) { +func (t *Transactor) BuildEjectOperatorsTxn(ctx context.Context, operatorsByQuorum [][]core.OperatorID) (*types.Transaction, error) { byteIdsByQuorum := make([][][32]byte, len(operatorsByQuorum)) for i, ids := range operatorsByQuorum { for _, id := range ids { @@ -376,17 +376,7 @@ func (t *Transactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]c t.Logger.Error("Failed to generate transact opts", "err", err) return nil, err } - txn, err := t.Bindings.EjectionManager.EjectOperators(opts, byteIdsByQuorum) - if err != nil { - t.Logger.Error("Failed to create transaction", "err", err) - return nil, err - } - receipt, err := t.EthClient.EstimateGasPriceAndLimitAndSendTx(context.Background(), txn, "EjectOperators", nil) - if err != nil { - t.Logger.Error("Failed to eject operators", "err", err) - return nil, err - } - return receipt, nil + return t.Bindings.EjectionManager.EjectOperators(opts, byteIdsByQuorum) } // GetOperatorStakes returns the stakes of all operators within the quorums that the operator represented by operatorId diff --git a/core/mock/tx.go b/core/mock/tx.go index 646f33c0fd..f2ba688d91 100644 --- a/core/mock/tx.go +++ b/core/mock/tx.go @@ -70,10 +70,10 @@ func (t *MockTransactor) UpdateOperatorSocket(ctx context.Context, socket string return args.Error(0) } -func (t *MockTransactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]core.OperatorID) (*types.Receipt, error) { +func (t *MockTransactor) BuildEjectOperatorsTxn(ctx context.Context, operatorsByQuorum [][]core.OperatorID) (*types.Transaction, error) { args := t.Called() result := args.Get(0) - return result.(*types.Receipt), args.Error(1) + return result.(*types.Transaction), args.Error(1) } func (t *MockTransactor) GetOperatorStakes(ctx context.Context, operatorId core.OperatorID, blockNumber uint32) (core.OperatorStakes, []core.QuorumID, error) { diff --git a/core/tx.go b/core/tx.go index b7d5f147aa..4b6ab74bd9 100644 --- a/core/tx.go +++ b/core/tx.go @@ -70,10 +70,10 @@ type Transactor interface { // UpdateOperatorSocket updates the socket of the operator in all the quorums that it is registered with. UpdateOperatorSocket(ctx context.Context, socket string) error - // EjectOperators ejects operators from AVS registryCoordinator. + // BuildEjectOperatorsTxn returns a transaction that ejects operators from AVS registryCoordinator. // The operatorsByQuorum provides a list of operators for each quorum. Within a quorum, // the operators are ordered; in case of rate limiting, the first operators will be ejected. - EjectOperators(ctx context.Context, operatorsByQuorum [][]OperatorID) (*types.Receipt, error) + BuildEjectOperatorsTxn(ctx context.Context, operatorsByQuorum [][]OperatorID) (*types.Transaction, error) // GetOperatorStakes returns the stakes of all operators within the quorums that the operator represented by operatorId // is registered with. The returned stakes are for the block number supplied. The indices of the operators within each quorum diff --git a/disperser/cmd/dataapi/config.go b/disperser/cmd/dataapi/config.go index db5c02712a..5bd369a7f9 100644 --- a/disperser/cmd/dataapi/config.go +++ b/disperser/cmd/dataapi/config.go @@ -2,6 +2,7 @@ package main import ( "errors" + "time" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/aws" @@ -17,6 +18,7 @@ type Config struct { AwsClientConfig aws.ClientConfig BlobstoreConfig blobstore.Config EthClientConfig geth.EthClientConfig + FireblocksConfig common.FireblocksConfig LoggerConfig common.LoggerConfig PrometheusConfig prometheus.Config MetricsConfig dataapi.MetricsConfig @@ -35,6 +37,9 @@ type Config struct { DisperserHostname string ChurnerHostname string BatcherHealthEndpt string + + FireblockAPITimeout time.Duration + TxnTimeout time.Duration } func NewConfig(ctx *cli.Context) (Config, error) { @@ -46,6 +51,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { if len(ejectionToken) < 20 { return Config{}, errors.New("the ejection token length must be at least 20") } + fireblocksConfig := common.ReadFireblocksCLIConfig(ctx, flags.FlagPrefix) config := Config{ BlobstoreConfig: blobstore.Config{ BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name), @@ -53,6 +59,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { }, AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix), EthClientConfig: geth.ReadEthClientConfig(ctx), + FireblocksConfig: fireblocksConfig, LoggerConfig: *loggerConfig, SocketAddr: ctx.GlobalString(flags.SocketAddrFlag.Name), SubgraphApiBatchMetadataAddr: ctx.GlobalString(flags.SubgraphApiBatchMetadataAddrFlag.Name), @@ -75,6 +82,9 @@ func NewConfig(ctx *cli.Context) (Config, error) { DisperserHostname: ctx.GlobalString(flags.DisperserHostnameFlag.Name), ChurnerHostname: ctx.GlobalString(flags.ChurnerHostnameFlag.Name), BatcherHealthEndpt: ctx.GlobalString(flags.BatcherHealthEndptFlag.Name), + + FireblockAPITimeout: ctx.GlobalDuration(flags.FireblockAPITimeoutFlag.Name), + TxnTimeout: ctx.GlobalDuration(flags.TxnTimeoutFlag.Name), } return config, nil } diff --git a/disperser/cmd/dataapi/flags/flags.go b/disperser/cmd/dataapi/flags/flags.go index efa7fdba9f..796979c559 100644 --- a/disperser/cmd/dataapi/flags/flags.go +++ b/disperser/cmd/dataapi/flags/flags.go @@ -1,6 +1,8 @@ package flags import ( + "time" + "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/aws" "github.com/Layr-Labs/eigenda/common/geth" @@ -137,6 +139,20 @@ var ( Value: "9100", EnvVar: common.PrefixEnvVar(envVarPrefix, "METRICS_HTTP_PORT"), } + FireblockAPITimeoutFlag = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "fireblocks-api-timeout"), + Usage: "the timeout for the fireblocks api", + Required: false, + Value: 3 * time.Minute, + EnvVar: common.PrefixEnvVar(envVarPrefix, "FIREBLOCKS_API_TIMEOUT"), + } + TxnTimeoutFlag = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "txn-timeout"), + Usage: "the timeout for the transaction", + Required: false, + Value: 6 * time.Minute, + EnvVar: common.PrefixEnvVar(envVarPrefix, "TRANSACTION_TIMEOUT"), + } ) var requiredFlags = []cli.Flag{ @@ -157,6 +173,8 @@ var requiredFlags = []cli.Flag{ DisperserHostnameFlag, ChurnerHostnameFlag, BatcherHealthEndptFlag, + FireblockAPITimeoutFlag, + TxnTimeoutFlag, } var optionalFlags = []cli.Flag{ @@ -171,5 +189,6 @@ func init() { Flags = append(requiredFlags, optionalFlags...) Flags = append(Flags, common.LoggerCLIFlags(envVarPrefix, FlagPrefix)...) Flags = append(Flags, geth.EthClientFlags(envVarPrefix)...) + Flags = append(Flags, common.FireblocksCLIFlags(envVarPrefix, FlagPrefix)...) Flags = append(Flags, aws.ClientFlags(envVarPrefix, FlagPrefix)...) } diff --git a/disperser/cmd/dataapi/main.go b/disperser/cmd/dataapi/main.go index 72437e480b..6464bd2c35 100644 --- a/disperser/cmd/dataapi/main.go +++ b/disperser/cmd/dataapi/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "fmt" "log" "os" @@ -11,6 +12,7 @@ import ( "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/aws/dynamodb" "github.com/Layr-Labs/eigenda/common/aws/s3" + "github.com/Layr-Labs/eigenda/common/aws/secretmanager" "github.com/Layr-Labs/eigenda/common/geth" coreeth "github.com/Layr-Labs/eigenda/core/eth" "github.com/Layr-Labs/eigenda/disperser/cmd/dataapi/flags" @@ -18,7 +20,13 @@ import ( "github.com/Layr-Labs/eigenda/disperser/dataapi" "github.com/Layr-Labs/eigenda/disperser/dataapi/prometheus" "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph" + "github.com/Layr-Labs/eigensdk-go/chainio/clients/fireblocks" + walletsdk "github.com/Layr-Labs/eigensdk-go/chainio/clients/wallet" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/Layr-Labs/eigensdk-go/signerv2" + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/urfave/cli" ) @@ -86,6 +94,10 @@ func RunDataApi(ctx *cli.Context) error { return err } + wallet, err := getWallet(config, client, logger) + if err != nil { + return err + } var ( promClient = dataapi.NewPrometheusClient(promApi, config.PrometheusConfig.Cluster) blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, 0) @@ -109,6 +121,7 @@ func RunDataApi(ctx *cli.Context) error { subgraphClient, tx, chainState, + dataapi.NewEjector(wallet, client, logger, tx, metrics, config.TxnTimeout), logger, metrics, nil, @@ -147,3 +160,70 @@ func RunDataApi(ctx *cli.Context) error { return err } + +func getWallet(config Config, ethClient common.EthClient, logger logging.Logger) (walletsdk.Wallet, error) { + var wallet walletsdk.Wallet + if !config.FireblocksConfig.Disable { + validConfigflag := len(config.FireblocksConfig.APIKeyName) > 0 && + len(config.FireblocksConfig.SecretKeyName) > 0 && + len(config.FireblocksConfig.BaseURL) > 0 && + len(config.FireblocksConfig.VaultAccountName) > 0 && + len(config.FireblocksConfig.WalletAddress) > 0 && + len(config.FireblocksConfig.Region) > 0 + if !validConfigflag { + return nil, errors.New("fireblocks config is either invalid or incomplete") + } + apiKey, err := secretmanager.ReadStringFromSecretManager(context.Background(), config.FireblocksConfig.APIKeyName, config.FireblocksConfig.Region) + if err != nil { + return nil, fmt.Errorf("cannot read fireblocks api key %s from secret manager: %w", config.FireblocksConfig.APIKeyName, err) + } + secretKey, err := secretmanager.ReadStringFromSecretManager(context.Background(), config.FireblocksConfig.SecretKeyName, config.FireblocksConfig.Region) + if err != nil { + return nil, fmt.Errorf("cannot read fireblocks secret key %s from secret manager: %w", config.FireblocksConfig.SecretKeyName, err) + } + fireblocksClient, err := fireblocks.NewClient( + apiKey, + []byte(secretKey), + config.FireblocksConfig.BaseURL, + config.FireblockAPITimeout, + logger.With("component", "FireblocksClient"), + ) + if err != nil { + return nil, err + } + wallet, err = walletsdk.NewFireblocksWallet(fireblocksClient, ethClient, config.FireblocksConfig.VaultAccountName, logger.With("component", "FireblocksWallet")) + if err != nil { + return nil, err + } + sender, err := wallet.SenderAddress(context.Background()) + if err != nil { + return nil, err + } + if sender.Cmp(gethcommon.HexToAddress(config.FireblocksConfig.WalletAddress)) != 0 { + return nil, fmt.Errorf("configured wallet address %s does not match derived address %s", config.FireblocksConfig.WalletAddress, sender.Hex()) + } + logger.Info("Initialized Fireblocks wallet", "vaultAccountName", config.FireblocksConfig.VaultAccountName, "address", sender.Hex()) + } else if len(config.EthClientConfig.PrivateKeyString) > 0 { + privateKey, err := crypto.HexToECDSA(config.EthClientConfig.PrivateKeyString) + if err != nil { + return nil, fmt.Errorf("failed to parse private key: %w", err) + } + chainID, err := ethClient.ChainID(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to get chain ID: %w", err) + } + signerV2, address, err := signerv2.SignerFromConfig(signerv2.Config{PrivateKey: privateKey}, chainID) + if err != nil { + return nil, err + } + wallet, err = walletsdk.NewPrivateKeyWallet(ethClient, signerV2, address, logger.With("component", "PrivateKeyWallet")) + if err != nil { + return nil, err + } + logger.Info("Initialized PrivateKey wallet", "address", address.Hex()) + } else { + return nil, errors.New("no wallet is configured. Either Fireblocks or PrivateKey wallet should be configured") + } + + return wallet, nil +} diff --git a/disperser/dataapi/ejector.go b/disperser/dataapi/ejector.go index dbf1c97d82..2d271ad600 100644 --- a/disperser/dataapi/ejector.go +++ b/disperser/dataapi/ejector.go @@ -2,11 +2,25 @@ package dataapi import ( "context" + "errors" + "fmt" + "math/big" + "net/url" "sort" "sync" + "time" + "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/core" + walletsdk "github.com/Layr-Labs/eigensdk-go/chainio/clients/wallet" "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/core/types" +) + +const ( + maxSendTransactionRetry = 3 + queryTickerDuration = 3 * time.Second ) // stakeShareToSLA returns the SLA for a given stake share in a quorum. @@ -39,24 +53,30 @@ func computePerfScore(metric *OperatorNonsigningPercentageMetrics) float64 { return operatorPerfScore(metric.StakePercentage, metric.Percentage) } -type ejector struct { +type Ejector struct { + wallet walletsdk.Wallet + ethClient common.EthClient logger logging.Logger transactor core.Transactor metrics *Metrics + txnTimeout time.Duration // For serializing the ejection requests. mu sync.Mutex } -func newEjector(logger logging.Logger, tx core.Transactor, metrics *Metrics) *ejector { - return &ejector{ +func NewEjector(wallet walletsdk.Wallet, ethClient common.EthClient, logger logging.Logger, tx core.Transactor, metrics *Metrics, txnTimeout time.Duration) *Ejector { + return &Ejector{ + wallet: wallet, + ethClient: ethClient, logger: logger.With("component", "Ejector"), transactor: tx, metrics: metrics, + txnTimeout: txnTimeout, } } -func (e *ejector) eject(ctx context.Context, nonsigningRate *OperatorsNonsigningPercentage) error { +func (e *Ejector) Eject(ctx context.Context, nonsigningRate *OperatorsNonsigningPercentage) error { e.mu.Lock() defer e.mu.Unlock() @@ -86,11 +106,73 @@ func (e *ejector) eject(ctx context.Context, nonsigningRate *OperatorsNonsigning return err } - receipt, err := e.transactor.EjectOperators(ctx, operatorsByQuorum) + txn, err := e.transactor.BuildEjectOperatorsTxn(ctx, operatorsByQuorum) if err != nil { - e.logger.Error("Ejection transaction failed", "err", err) + e.logger.Error("Failed to build ejection transaction", "err", err) return err } + + var txID walletsdk.TxID + retryFromFailure := 0 + for retryFromFailure < maxSendTransactionRetry { + gasTipCap, gasFeeCap, err := e.ethClient.GetLatestGasCaps(ctx) + if err != nil { + return fmt.Errorf("failed to get latest gas caps: %w", err) + } + + txn, err = e.ethClient.UpdateGas(ctx, txn, big.NewInt(0), gasTipCap, gasFeeCap) + if err != nil { + return fmt.Errorf("failed to update gas price: %w", err) + } + txID, err = e.wallet.SendTransaction(ctx, txn) + var urlErr *url.Error + didTimeout := false + if errors.As(err, &urlErr) { + didTimeout = urlErr.Timeout() + } + if didTimeout || errors.Is(err, context.DeadlineExceeded) { + e.logger.Warn("failed to send txn due to timeout", "hash", txn.Hash().Hex(), "numRetries", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err) + retryFromFailure++ + continue + } else if err != nil { + return fmt.Errorf("failed to send txn %s: %w", txn.Hash().Hex(), err) + } else { + e.logger.Debug("successfully sent txn", "txID", txID, "txHash", txn.Hash().Hex()) + break + } + } + + queryTicker := time.NewTicker(queryTickerDuration) + defer queryTicker.Stop() + ctxWithTimeout, cancelCtx := context.WithTimeout(ctx, e.txnTimeout) + defer cancelCtx() + var receipt *types.Receipt + for { + receipt, err = e.wallet.GetTransactionReceipt(ctx, txID) + if err == nil { + break + } + + if errors.Is(err, ethereum.NotFound) || errors.Is(err, walletsdk.ErrReceiptNotYetAvailable) { + e.logger.Debug("Transaction not yet mined", "txID", txID, "txHash", txn.Hash().Hex(), "err", err) + } else if errors.Is(err, walletsdk.ErrNotYetBroadcasted) { + e.logger.Warn("Transaction has not been broadcasted to network but attempted to retrieve receipt", "err", err) + } else if errors.Is(err, walletsdk.ErrTransactionFailed) { + e.logger.Error("Transaction failed", "txID", txID, "txHash", txn.Hash().Hex(), "err", err) + return err + } else { + e.logger.Error("Transaction receipt retrieval failed", "err", err) + return err + } + + // Wait for the next round. + select { + case <-ctx.Done(): + return ctxWithTimeout.Err() + case <-queryTicker.C: + } + } + e.logger.Info("Ejection transaction succeeded", "receipt", receipt) e.metrics.UpdateEjectionGasUsed(receipt.GasUsed) @@ -100,7 +182,7 @@ func (e *ejector) eject(ctx context.Context, nonsigningRate *OperatorsNonsigning return nil } -func (e *ejector) convertOperators(nonsigners []*OperatorNonsigningPercentageMetrics) ([][]core.OperatorID, error) { +func (e *Ejector) convertOperators(nonsigners []*OperatorNonsigningPercentageMetrics) ([][]core.OperatorID, error) { var maxQuorumId uint8 for _, metric := range nonsigners { if metric.QuorumId > maxQuorumId { diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index 8d730d93b2..7b49a94791 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -153,7 +153,7 @@ type ( subgraphClient SubgraphClient transactor core.Transactor chainState core.ChainState - ejector *ejector + ejector *Ejector ejectionToken string metrics *Metrics @@ -172,6 +172,7 @@ func NewServer( subgraphClient SubgraphClient, transactor core.Transactor, chainState core.ChainState, + ejector *Ejector, logger logging.Logger, metrics *Metrics, grpcConn GRPCConn, @@ -193,8 +194,6 @@ func NewServer( eigenDAHttpServiceChecker = &HttpServiceAvailability{} } - ejector := newEjector(logger, transactor, metrics) - return &server{ logger: logger.With("component", "DataAPIServer"), serverMode: config.ServerMode, @@ -350,7 +349,7 @@ func (s *server) EjectOperatorsHandler(c *gin.Context) { nonSigningRate, err := s.getOperatorNonsigningRate(c.Request.Context(), endTime.Unix()-interval, endTime.Unix(), true) if err == nil { - err = s.ejector.eject(c.Request.Context(), nonSigningRate) + err = s.ejector.Eject(c.Request.Context(), nonSigningRate) } if err != nil { s.metrics.IncrementFailedRequestNum("EjectOperators") diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index 7082fbda45..61b658ddd5 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + commonmock "github.com/Layr-Labs/eigenda/common/mock" "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/disperser" @@ -24,6 +25,7 @@ import ( "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph" subgraphmock "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph/mock" "github.com/Layr-Labs/eigenda/encoding" + sdkmock "github.com/Layr-Labs/eigensdk-go/chainio/clients/mocks" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/consensys/gnark-crypto/ecc/bn254/fp" "github.com/ethereum/go-ethereum/common" @@ -34,6 +36,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "go.uber.org/goleak" + "go.uber.org/mock/gomock" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" ) @@ -55,6 +58,7 @@ var ( config = dataapi.Config{ServerMode: "test", SocketAddr: ":8080", AllowOrigins: []string{"*"}, DisperserHostname: "localhost:32007", ChurnerHostname: "localhost:32009", EjectionToken: "deadbeef"} mockTx = &coremock.MockTransactor{} + metrics = dataapi.NewMetrics(nil, "9001", mockLogger) opId0, _ = core.OperatorIDFromHex("e22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311") opId1, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568312") mockChainState, _ = coremock.NewChainDataMock(map[uint8]map[core.OperatorID]int{ @@ -67,7 +71,7 @@ var ( opId1: 3, }, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, nil, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) expectedBatchHeaderHash = [32]byte{1, 2, 3} expectedBlobIndex = uint32(1) expectedRequestedAt = uint64(5567830000000000000) @@ -326,21 +330,32 @@ func TestFetchMetricsThroughputHandler(t *testing.T) { func TestEjectOperatorHandler(t *testing.T) { r := setUpRouter() + ejectorComponents := getEjector(t) + testDataApiServer := dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, ejectorComponents.ejector, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: true}) - stopTime := time.Now() + stopTime := time.Now().UTC() interval := 3600 startTime := stopTime.Add(-time.Duration(interval) * time.Second) - mockSubgraphApi.On("QueryBatchNonSigningInfo", startTime.Unix(), stopTime.Unix()).Return(batchNonSigningInfo, nil) addr1 := gethcommon.HexToAddress("0x00000000219ab540356cbb839cbe05303d7705fa") addr2 := gethcommon.HexToAddress("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2") mockTx.On("BatchOperatorIDToAddress").Return([]gethcommon.Address{addr1, addr2}, nil) mockTx.On("GetQuorumBitmapForOperatorsAtBlockNumber").Return([]*big.Int{big.NewInt(3), big.NewInt(0)}, nil) + mockTx.On("BuildEjectOperatorsTxn").Return(types.NewTransaction(0, gethcommon.HexToAddress("0x1"), big.NewInt(0), 0, big.NewInt(0), []byte{}), nil) mockTx.On("EjectOperators").Return(&types.Receipt{ GasUsed: uint64(10), }, nil) mockSubgraphApi.On("QueryOperatorAddedToQuorum").Return(operatorAddedToQuorum, nil) mockSubgraphApi.On("QueryOperatorRemovedFromQuorum").Return(operatorRemovedFromQuorum, nil) + ejectorComponents.ethClient.On("GetLatestGasCaps").Return(big.NewInt(0), big.NewInt(0), nil) + ejectorComponents.ethClient.On("UpdateGas").Return(types.NewTransaction(0, gethcommon.HexToAddress("0x1"), big.NewInt(0), 0, big.NewInt(0), []byte{}), nil) + txID := "1234" + gomock.InOrder( + ejectorComponents.wallet.EXPECT().SendTransaction(gomock.Any(), gomock.Any()).Return(txID, nil), + ejectorComponents.wallet.EXPECT().GetTransactionReceipt(gomock.Any(), gomock.Any()).Return(&types.Receipt{ + BlockNumber: new(big.Int).SetUint64(1), + }, nil), + ) r.GET("/v1/ejector/operator", testDataApiServer.EjectOperatorsHandler) @@ -366,7 +381,7 @@ func TestEjectOperatorHandler(t *testing.T) { func TestFetchUnsignedBatchesHandler(t *testing.T) { r := setUpRouter() - stopTime := time.Now() + stopTime := time.Now().UTC() interval := 3600 startTime := stopTime.Add(-time.Duration(interval) * time.Second) @@ -422,10 +437,27 @@ func TestFetchUnsignedBatchesHandler(t *testing.T) { assert.Equal(t, float64(25), responseData.StakePercentage) } +type ejectorComponents struct { + wallet *sdkmock.MockWallet + ethClient *commonmock.MockEthClient + ejector *dataapi.Ejector +} + +func getEjector(t *testing.T) *ejectorComponents { + ctrl := gomock.NewController(t) + w := sdkmock.NewMockWallet(ctrl) + ethClient := &commonmock.MockEthClient{} + ejector := dataapi.NewEjector(w, ethClient, mockLogger, mockTx, metrics, 100*time.Millisecond) + return &ejectorComponents{ + wallet: w, + ethClient: ethClient, + ejector: ejector, + } +} + func TestCheckBatcherHealthExpectServing(t *testing.T) { r := setUpRouter() - - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: true}) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: true}) r.GET("/v1/metrics/batcher-service-availability", testDataApiServer.FetchBatcherAvailability) @@ -458,7 +490,7 @@ func TestCheckBatcherHealthExpectServing(t *testing.T) { func TestCheckBatcherHealthExpectNotServing(t *testing.T) { r := setUpRouter() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: false}) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: false}) r.GET("/v1/metrics/batcher-service-availability", testDataApiServer.FetchBatcherAvailability) @@ -496,7 +528,7 @@ func TestFetchDisperserServiceAvailabilityHandler(t *testing.T) { Status: grpc_health_v1.HealthCheckResponse_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, mockHealthCheckService, nil) r.GET("/v1/metrics/disperser-service-availability", testDataApiServer.FetchDisperserServiceAvailability) @@ -534,7 +566,7 @@ func TestChurnerServiceAvailabilityHandler(t *testing.T) { Status: grpc_health_v1.HealthCheckResponse_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, mockHealthCheckService, nil) r.GET("/v1/metrics/churner-service-availability", testDataApiServer.FetchChurnerServiceAvailability) @@ -578,7 +610,7 @@ func TestFetchDeregisteredOperatorNoSocketInfoOneOperatorHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfoNoSocketInfo, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -627,7 +659,7 @@ func TestFetchDeregisteredMultipleOperatorsOneWithNoSocketInfoHandler(t *testing // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfoNoSocketInfo, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -695,7 +727,7 @@ func TestFetchDeregisteredOperatorInfoInvalidTimeStampHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -740,7 +772,7 @@ func TestFetchDeregisteredOperatorInfoInvalidTimeStampTwoOperatorsHandler(t *tes // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -796,7 +828,7 @@ func TestFetchMetricsDeregisteredOperatorHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -861,7 +893,7 @@ func TestFetchDeregisteredOperatorOffline(t *testing.T) { mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) @@ -914,7 +946,7 @@ func TestFetchDeregisteredOperatorsWithoutDaysQueryParam(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -972,7 +1004,7 @@ func TestFetchDeregisteredOperatorInvalidDaysQueryParam(t *testing.T) { mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -1014,7 +1046,7 @@ func TestFetchDeregisteredOperatorQueryDaysGreaterThan30(t *testing.T) { mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) @@ -1060,7 +1092,7 @@ func TestFetchDeregisteredOperatorsMultipleOffline(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -1118,7 +1150,7 @@ func TestFetchDeregisteredOperatorOnline(t *testing.T) { mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) @@ -1174,7 +1206,7 @@ func TestFetchDeregisteredOperatorsMultipleOfflineOnline(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -1241,7 +1273,7 @@ func TestFetchDeregisteredOperatorsMultipleOnline(t *testing.T) { mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphTwoOperatorsDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -1316,7 +1348,7 @@ func TestFetchDeregisteredOperatorsMultipleOfflineSameBlock(t *testing.T) { mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo3, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil)