From c044a7ab4990417ff28f489ebdd80d995a8b414b Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Fri, 26 Apr 2024 11:29:02 -0700 Subject: [PATCH] move txn manager out of batcher --- .../batcher => common}/mock/txn_manager.go | 12 +-- {disperser/batcher => common}/txn_manager.go | 7 +- common/txn_manager_metrics.go | 77 +++++++++++++++++++ .../batcher => common}/txn_manager_test.go | 35 +++++---- disperser/batcher/batcher.go | 8 +- disperser/batcher/batcher_test.go | 17 ++-- disperser/batcher/metrics.go | 74 +----------------- disperser/cmd/batcher/main.go | 2 +- test/integration_test.go | 6 +- 9 files changed, 124 insertions(+), 114 deletions(-) rename {disperser/batcher => common}/mock/txn_manager.go (59%) rename {disperser/batcher => common}/txn_manager.go (97%) create mode 100644 common/txn_manager_metrics.go rename {disperser/batcher => common}/txn_manager_test.go (87%) diff --git a/disperser/batcher/mock/txn_manager.go b/common/mock/txn_manager.go similarity index 59% rename from disperser/batcher/mock/txn_manager.go rename to common/mock/txn_manager.go index cec8cab703..011bd4f003 100644 --- a/disperser/batcher/mock/txn_manager.go +++ b/common/mock/txn_manager.go @@ -3,17 +3,17 @@ package mock import ( "context" - "github.com/Layr-Labs/eigenda/disperser/batcher" + "github.com/Layr-Labs/eigenda/common" "github.com/stretchr/testify/mock" ) type MockTxnManager struct { mock.Mock - Requests []*batcher.TxnRequest + Requests []*common.TxnRequest } -var _ batcher.TxnManager = (*MockTxnManager)(nil) +var _ common.TxnManager = (*MockTxnManager)(nil) func NewTxnManager() *MockTxnManager { return &MockTxnManager{} @@ -21,13 +21,13 @@ func NewTxnManager() *MockTxnManager { func (b *MockTxnManager) Start(ctx context.Context) {} -func (b *MockTxnManager) ProcessTransaction(ctx context.Context, req *batcher.TxnRequest) error { +func (b *MockTxnManager) ProcessTransaction(ctx context.Context, req *common.TxnRequest) error { args := b.Called() b.Requests = append(b.Requests, req) return args.Error(0) } -func (b *MockTxnManager) ReceiptChan() chan *batcher.ReceiptOrErr { +func (b *MockTxnManager) ReceiptChan() chan *common.ReceiptOrErr { args := b.Called() - return args.Get(0).(chan *batcher.ReceiptOrErr) + return args.Get(0).(chan *common.ReceiptOrErr) } diff --git a/disperser/batcher/txn_manager.go b/common/txn_manager.go similarity index 97% rename from disperser/batcher/txn_manager.go rename to common/txn_manager.go index f86757889b..a6346addbd 100644 --- a/disperser/batcher/txn_manager.go +++ b/common/txn_manager.go @@ -1,4 +1,4 @@ -package batcher +package common import ( "context" @@ -9,7 +9,6 @@ import ( "sync" "time" - "github.com/Layr-Labs/eigenda/common" walletsdk "github.com/Layr-Labs/eigensdk-go/chainio/clients/wallet" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/ethereum/go-ethereum" @@ -67,7 +66,7 @@ type ReceiptOrErr struct { type txnManager struct { mu sync.Mutex - ethClient common.EthClient + ethClient EthClient wallet walletsdk.Wallet numConfirmations int requestChan chan *TxnRequest @@ -82,7 +81,7 @@ type txnManager struct { var _ TxnManager = (*txnManager)(nil) -func NewTxnManager(ethClient common.EthClient, wallet walletsdk.Wallet, numConfirmations, queueSize int, txnBroadcastTimeout time.Duration, txnRefreshInterval time.Duration, logger logging.Logger, metrics *TxnManagerMetrics) TxnManager { +func NewTxnManager(ethClient EthClient, wallet walletsdk.Wallet, numConfirmations, queueSize int, txnBroadcastTimeout time.Duration, txnRefreshInterval time.Duration, logger logging.Logger, metrics *TxnManagerMetrics) TxnManager { return &txnManager{ ethClient: ethClient, wallet: wallet, diff --git a/common/txn_manager_metrics.go b/common/txn_manager_metrics.go new file mode 100644 index 0000000000..f81bfcf06a --- /dev/null +++ b/common/txn_manager_metrics.go @@ -0,0 +1,77 @@ +package common + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type TxnManagerMetrics struct { + Latency *prometheus.SummaryVec + GasUsed prometheus.Gauge + SpeedUps prometheus.Gauge + TxQueue prometheus.Gauge + NumTx *prometheus.CounterVec +} + +func NewTxnManagerMetrics(namespace string, reg *prometheus.Registry) *TxnManagerMetrics { + return &TxnManagerMetrics{ + Latency: promauto.With(reg).NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "txn_manager_latency_ms", + Help: "transaction confirmation latency summary in milliseconds", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, + }, + []string{"stage"}, + ), + GasUsed: promauto.With(reg).NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "gas_used", + Help: "gas used for onchain batch confirmation", + }, + ), + SpeedUps: promauto.With(reg).NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "speed_ups", + Help: "number of times the gas price was increased", + }, + ), + TxQueue: promauto.With(reg).NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "tx_queue", + Help: "number of transactions in transaction queue", + }, + ), + NumTx: promauto.With(reg).NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "tx_total", + Help: "number of transactions processed", + }, + []string{"state"}, + ), + } +} + +func (t *TxnManagerMetrics) ObserveLatency(stage string, latencyMs float64) { + t.Latency.WithLabelValues(stage).Observe(latencyMs) +} + +func (t *TxnManagerMetrics) UpdateGasUsed(gasUsed uint64) { + t.GasUsed.Set(float64(gasUsed)) +} + +func (t *TxnManagerMetrics) UpdateSpeedUps(speedUps int) { + t.SpeedUps.Set(float64(speedUps)) +} + +func (t *TxnManagerMetrics) UpdateTxQueue(txQueue int) { + t.TxQueue.Set(float64(txQueue)) +} + +func (t *TxnManagerMetrics) IncrementTxnCount(state string) { + t.NumTx.WithLabelValues(state).Inc() +} diff --git a/disperser/batcher/txn_manager_test.go b/common/txn_manager_test.go similarity index 87% rename from disperser/batcher/txn_manager_test.go rename to common/txn_manager_test.go index c3886898e4..806e766418 100644 --- a/disperser/batcher/txn_manager_test.go +++ b/common/txn_manager_test.go @@ -1,4 +1,4 @@ -package batcher_test +package common_test import ( "context" @@ -7,6 +7,7 @@ import ( "testing" "time" + dacommon "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/mock" "github.com/Layr-Labs/eigenda/disperser/batcher" sdkmock "github.com/Layr-Labs/eigensdk-go/chainio/clients/mocks" @@ -24,7 +25,7 @@ func TestProcessTransaction(t *testing.T) { w := sdkmock.NewMockWallet(ctrl) logger := logging.NewNoopLogger() metrics := batcher.NewMetrics("9100", logger) - txnManager := batcher.NewTxnManager(ethClient, w, 0, 5, 100*time.Millisecond, 100*time.Millisecond, logger, metrics.TxnManagerMetrics) + txnManager := dacommon.NewTxnManager(ethClient, w, 0, 5, 100*time.Millisecond, 100*time.Millisecond, logger, metrics.TxnManagerMetrics) ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() txnManager.Start(ctx) @@ -40,7 +41,7 @@ func TestProcessTransaction(t *testing.T) { }, nil).Times(2), ) - err := txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{ + err := txnManager.ProcessTransaction(ctx, &dacommon.TxnRequest{ Tx: txn, Tag: "test transaction", Value: nil, @@ -56,7 +57,7 @@ func TestProcessTransaction(t *testing.T) { w.EXPECT().GetTransactionReceipt(gomock.Any(), gomock.Any()).Return(nil, randomErr).AnyTimes() w.EXPECT().SendTransaction(gomock.Any(), gomock.Any()).Return("", randomErr).AnyTimes() - err = txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{ + err = txnManager.ProcessTransaction(ctx, &dacommon.TxnRequest{ Tx: txn, Tag: "test transaction", Value: nil, @@ -74,7 +75,7 @@ func TestReplaceGasFee(t *testing.T) { w := sdkmock.NewMockWallet(ctrl) logger := logging.NewNoopLogger() metrics := batcher.NewMetrics("9100", logger) - txnManager := batcher.NewTxnManager(ethClient, w, 0, 5, 100*time.Millisecond, 100*time.Millisecond, logger, metrics.TxnManagerMetrics) + txnManager := dacommon.NewTxnManager(ethClient, w, 0, 5, 100*time.Millisecond, 100*time.Millisecond, logger, metrics.TxnManagerMetrics) ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() txnManager.Start(ctx) @@ -93,7 +94,7 @@ func TestReplaceGasFee(t *testing.T) { BlockNumber: new(big.Int).SetUint64(1), }, nil) - err := txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{ + err := txnManager.ProcessTransaction(ctx, &dacommon.TxnRequest{ Tx: txn, Tag: "test transaction", Value: nil, @@ -110,7 +111,7 @@ func TestTransactionReplacementFailure(t *testing.T) { w := sdkmock.NewMockWallet(ctrl) logger := logging.NewNoopLogger() metrics := batcher.NewMetrics("9100", logger) - txnManager := batcher.NewTxnManager(ethClient, w, 0, 5, time.Second, 48*time.Second, logger, metrics.TxnManagerMetrics) + txnManager := dacommon.NewTxnManager(ethClient, w, 0, 5, time.Second, 48*time.Second, logger, metrics.TxnManagerMetrics) ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() txnManager.Start(ctx) @@ -126,7 +127,7 @@ func TestTransactionReplacementFailure(t *testing.T) { w.EXPECT().SendTransaction(gomock.Any(), gomock.Any()).Return(badTxID, nil) w.EXPECT().GetTransactionReceipt(gomock.Any(), badTxID).Return(nil, errors.New("blah")).AnyTimes() - err := txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{ + err := txnManager.ProcessTransaction(ctx, &dacommon.TxnRequest{ Tx: txn, Tag: "test transaction", Value: nil, @@ -143,7 +144,7 @@ func TestSendTransactionReceiptRetry(t *testing.T) { w := sdkmock.NewMockWallet(ctrl) logger := logging.NewNoopLogger() metrics := batcher.NewMetrics("9100", logger) - txnManager := batcher.NewTxnManager(ethClient, w, 0, 5, time.Second, 48*time.Second, logger, metrics.TxnManagerMetrics) + txnManager := dacommon.NewTxnManager(ethClient, w, 0, 5, time.Second, 48*time.Second, logger, metrics.TxnManagerMetrics) ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() txnManager.Start(ctx) @@ -161,7 +162,7 @@ func TestSendTransactionReceiptRetry(t *testing.T) { BlockNumber: new(big.Int).SetUint64(1), }, nil) - err := txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{ + err := txnManager.ProcessTransaction(ctx, &dacommon.TxnRequest{ Tx: txn, Tag: "test transaction", Value: nil, @@ -181,7 +182,7 @@ func TestSendTransactionRetrySuccess(t *testing.T) { w := sdkmock.NewMockWallet(ctrl) logger := logging.NewNoopLogger() metrics := batcher.NewMetrics("9100", logger) - txnManager := batcher.NewTxnManager(ethClient, w, 0, 5, time.Second, 48*time.Second, logger, metrics.TxnManagerMetrics) + txnManager := dacommon.NewTxnManager(ethClient, w, 0, 5, time.Second, 48*time.Second, logger, metrics.TxnManagerMetrics) ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() txnManager.Start(ctx) @@ -203,7 +204,7 @@ func TestSendTransactionRetrySuccess(t *testing.T) { BlockNumber: new(big.Int).SetUint64(1), }, nil) - err := txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{ + err := txnManager.ProcessTransaction(ctx, &dacommon.TxnRequest{ Tx: txn, Tag: "test transaction", Value: nil, @@ -223,7 +224,7 @@ func TestSendTransactionRetryFailure(t *testing.T) { w := sdkmock.NewMockWallet(ctrl) logger := logging.NewNoopLogger() metrics := batcher.NewMetrics("9100", logger) - txnManager := batcher.NewTxnManager(ethClient, w, 0, 5, time.Second, 48*time.Second, logger, metrics.TxnManagerMetrics) + txnManager := dacommon.NewTxnManager(ethClient, w, 0, 5, time.Second, 48*time.Second, logger, metrics.TxnManagerMetrics) ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() txnManager.Start(ctx) @@ -241,7 +242,7 @@ func TestSendTransactionRetryFailure(t *testing.T) { // assume that the transaction is not mined within the timeout w.EXPECT().GetTransactionReceipt(gomock.Any(), txID).Return(nil, walletsdk.ErrReceiptNotYetAvailable).AnyTimes() - err := txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{ + err := txnManager.ProcessTransaction(ctx, &dacommon.TxnRequest{ Tx: txn, Tag: "test transaction", Value: nil, @@ -261,7 +262,7 @@ func TestTransactionNotBroadcasted(t *testing.T) { w := sdkmock.NewMockWallet(ctrl) logger := logging.NewNoopLogger() metrics := batcher.NewMetrics("9100", logger) - txnManager := batcher.NewTxnManager(ethClient, w, 0, 5, 100*time.Millisecond, 48*time.Second, logger, metrics.TxnManagerMetrics) + txnManager := dacommon.NewTxnManager(ethClient, w, 0, 5, 100*time.Millisecond, 48*time.Second, logger, metrics.TxnManagerMetrics) ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() txnManager.Start(ctx) @@ -275,7 +276,7 @@ func TestTransactionNotBroadcasted(t *testing.T) { // assume that the transaction does not get broadcasted to the network w.EXPECT().GetTransactionReceipt(gomock.Any(), txID).Return(nil, walletsdk.ErrNotYetBroadcasted).AnyTimes() - err := txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{ + err := txnManager.ProcessTransaction(ctx, &dacommon.TxnRequest{ Tx: txn, Tag: "test transaction", Value: nil, @@ -283,6 +284,6 @@ func TestTransactionNotBroadcasted(t *testing.T) { <-ctx.Done() assert.NoError(t, err) res := <-txnManager.ReceiptChan() - assert.ErrorAs(t, res.Err, &batcher.ErrTransactionNotBroadcasted) + assert.ErrorAs(t, res.Err, &dacommon.ErrTransactionNotBroadcasted) assert.Nil(t, res.Receipt) } diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 5432ae283e..120719da50 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -80,7 +80,7 @@ type Batcher struct { Aggregator core.SignatureAggregator EncodingStreamer *EncodingStreamer Transactor core.Transactor - TransactionManager TxnManager + TransactionManager common.TxnManager Metrics *Metrics HeartbeatChan chan time.Time @@ -101,7 +101,7 @@ func NewBatcher( ethClient common.EthClient, finalizer Finalizer, transactor core.Transactor, - txnManager TxnManager, + txnManager common.TxnManager, logger logging.Logger, metrics *Metrics, heartbeatChan chan time.Time, @@ -319,7 +319,7 @@ func (b *Batcher) updateConfirmationInfo( return blobsToRetry, nil } -func (b *Batcher) ProcessConfirmedBatch(ctx context.Context, receiptOrErr *ReceiptOrErr) error { +func (b *Batcher) ProcessConfirmedBatch(ctx context.Context, receiptOrErr *common.ReceiptOrErr) error { if receiptOrErr.Metadata == nil { return errors.New("failed to process confirmed batch: no metadata from transaction manager response") } @@ -492,7 +492,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { _ = b.handleFailure(ctx, batch.BlobMetadata, FailConfirmBatch) return fmt.Errorf("HandleSingleBatch: error building confirmBatch transaction: %w", err) } - err = b.TransactionManager.ProcessTransaction(ctx, NewTxnRequest(txn, "confirmBatch", big.NewInt(0), confirmationMetadata{ + err = b.TransactionManager.ProcessTransaction(ctx, common.NewTxnRequest(txn, "confirmBatch", big.NewInt(0), confirmationMetadata{ batchHeader: batch.BatchHeader, blobs: batch.BlobMetadata, blobHeaders: batch.BlobHeaders, diff --git a/disperser/batcher/batcher_test.go b/disperser/batcher/batcher_test.go index 2c15d8cd54..8c0ab2fb20 100644 --- a/disperser/batcher/batcher_test.go +++ b/disperser/batcher/batcher_test.go @@ -18,7 +18,6 @@ import ( coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/disperser" bat "github.com/Layr-Labs/eigenda/disperser/batcher" - "github.com/Layr-Labs/eigenda/disperser/batcher/mock" batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock" "github.com/Layr-Labs/eigenda/disperser/common/inmem" dmock "github.com/Layr-Labs/eigenda/disperser/mock" @@ -38,7 +37,7 @@ var ( type batcherComponents struct { transactor *coremock.MockTransactor - txnManager *batchermock.MockTxnManager + txnManager *cmock.MockTxnManager blobStore disperser.BlobStore encoderClient *disperser.LocalEncoderClient encodingStreamer *bat.EncodingStreamer @@ -121,7 +120,7 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time. encoderClient := disperser.NewLocalEncoderClient(p) finalizer := batchermock.NewFinalizer() ethClient := &cmock.MockEthClient{} - txnManager := mock.NewTxnManager() + txnManager := cmock.NewTxnManager() b, err := bat.NewBatcher(config, timeoutConfig, blobStore, dispatcher, cst, asgn, encoderClient, agg, ethClient, finalizer, transactor, txnManager, logger, metrics, handleBatchLivenessChan) assert.NoError(t, err) @@ -228,7 +227,7 @@ func TestBatcherIterations(t *testing.T) { err = batcher.HandleSingleBatch(ctx) assert.NoError(t, err) assert.Greater(t, len(components.txnManager.Requests), 0) - err = batcher.ProcessConfirmedBatch(ctx, &bat.ReceiptOrErr{ + err = batcher.ProcessConfirmedBatch(ctx, &common.ReceiptOrErr{ Receipt: receipt, Err: nil, Metadata: components.txnManager.Requests[len(components.txnManager.Requests)-1].Metadata, @@ -304,7 +303,7 @@ func TestBlobFailures(t *testing.T) { err = batcher.HandleSingleBatch(ctx) assert.NoError(t, err) assert.Greater(t, len(components.txnManager.Requests), 0) - err = batcher.ProcessConfirmedBatch(ctx, &bat.ReceiptOrErr{ + err = batcher.ProcessConfirmedBatch(ctx, &common.ReceiptOrErr{ Receipt: nil, Err: confirmationErr, Metadata: components.txnManager.Requests[len(components.txnManager.Requests)-1].Metadata, @@ -333,7 +332,7 @@ func TestBlobFailures(t *testing.T) { components.encodingStreamer.ReferenceBlockNumber = 10 err = batcher.HandleSingleBatch(ctx) assert.NoError(t, err) - err = batcher.ProcessConfirmedBatch(ctx, &bat.ReceiptOrErr{ + err = batcher.ProcessConfirmedBatch(ctx, &common.ReceiptOrErr{ Receipt: &types.Receipt{ TxHash: gethcommon.HexToHash("0x1234"), }, @@ -358,7 +357,7 @@ func TestBlobFailures(t *testing.T) { err = batcher.HandleSingleBatch(ctx) assert.NoError(t, err) - err = batcher.ProcessConfirmedBatch(ctx, &bat.ReceiptOrErr{ + err = batcher.ProcessConfirmedBatch(ctx, &common.ReceiptOrErr{ Receipt: &types.Receipt{ TxHash: gethcommon.HexToHash("0x1234"), }, @@ -453,7 +452,7 @@ func TestBlobRetry(t *testing.T) { // Trigger a retry confirmationErr := errors.New("error") - err = batcher.ProcessConfirmedBatch(ctx, &bat.ReceiptOrErr{ + err = batcher.ProcessConfirmedBatch(ctx, &common.ReceiptOrErr{ Receipt: nil, Err: confirmationErr, Metadata: components.txnManager.Requests[len(components.txnManager.Requests)-1].Metadata, @@ -539,7 +538,7 @@ func TestRetryTxnReceipt(t *testing.T) { err = batcher.HandleSingleBatch(ctx) assert.NoError(t, err) - err = batcher.ProcessConfirmedBatch(ctx, &bat.ReceiptOrErr{ + err = batcher.ProcessConfirmedBatch(ctx, &common.ReceiptOrErr{ Receipt: invalidReceipt, Err: nil, Metadata: components.txnManager.Requests[len(components.txnManager.Requests)-1].Metadata, diff --git a/disperser/batcher/metrics.go b/disperser/batcher/metrics.go index 9939458c81..005e30c069 100644 --- a/disperser/batcher/metrics.go +++ b/disperser/batcher/metrics.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" + "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigensdk-go/logging" @@ -35,14 +36,6 @@ type EncodingStreamerMetrics struct { EncodedBlobs *prometheus.GaugeVec } -type TxnManagerMetrics struct { - Latency *prometheus.SummaryVec - GasUsed prometheus.Gauge - SpeedUps prometheus.Gauge - TxQueue prometheus.Gauge - NumTx *prometheus.CounterVec -} - type FinalizerMetrics struct { NumBlobs *prometheus.CounterVec LastSeenFinalizedBlock prometheus.Gauge @@ -55,7 +48,7 @@ type DispatcherMetrics struct { type Metrics struct { *EncodingStreamerMetrics - *TxnManagerMetrics + *common.TxnManagerMetrics *FinalizerMetrics *DispatcherMetrics @@ -88,46 +81,7 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics { ), } - txnManagerMetrics := TxnManagerMetrics{ - Latency: promauto.With(reg).NewSummaryVec( - prometheus.SummaryOpts{ - Namespace: namespace, - Name: "txn_manager_latency_ms", - Help: "transaction confirmation latency summary in milliseconds", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, - }, - []string{"stage"}, - ), - GasUsed: promauto.With(reg).NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Name: "gas_used", - Help: "gas used for onchain batch confirmation", - }, - ), - SpeedUps: promauto.With(reg).NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Name: "speed_ups", - Help: "number of times the gas price was increased", - }, - ), - TxQueue: promauto.With(reg).NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Name: "tx_queue", - Help: "number of transactions in transaction queue", - }, - ), - NumTx: promauto.With(reg).NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Name: "tx_total", - Help: "number of transactions processed", - }, - []string{"state"}, - ), - } + txnManagerMetrics := common.NewTxnManagerMetrics(namespace, reg) finalizerMetrics := FinalizerMetrics{ NumBlobs: promauto.With(reg).NewCounterVec( @@ -170,7 +124,7 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics { metrics := &Metrics{ EncodingStreamerMetrics: &encodingStreamerMetrics, - TxnManagerMetrics: &txnManagerMetrics, + TxnManagerMetrics: txnManagerMetrics, FinalizerMetrics: &finalizerMetrics, DispatcherMetrics: &dispatcherMatrics, Blob: promauto.With(reg).NewCounterVec( @@ -314,26 +268,6 @@ func (e *EncodingStreamerMetrics) UpdateEncodedBlobs(count int, size uint64) { e.EncodedBlobs.WithLabelValues("number").Set(float64(count)) } -func (t *TxnManagerMetrics) ObserveLatency(stage string, latencyMs float64) { - t.Latency.WithLabelValues(stage).Observe(latencyMs) -} - -func (t *TxnManagerMetrics) UpdateGasUsed(gasUsed uint64) { - t.GasUsed.Set(float64(gasUsed)) -} - -func (t *TxnManagerMetrics) UpdateSpeedUps(speedUps int) { - t.SpeedUps.Set(float64(speedUps)) -} - -func (t *TxnManagerMetrics) UpdateTxQueue(txQueue int) { - t.TxQueue.Set(float64(txQueue)) -} - -func (t *TxnManagerMetrics) IncrementTxnCount(state string) { - t.NumTx.WithLabelValues(state).Inc() -} - func (f *FinalizerMetrics) IncrementNumBlobs(state string) { f.NumBlobs.WithLabelValues(state).Inc() } diff --git a/disperser/cmd/batcher/main.go b/disperser/cmd/batcher/main.go index 34e5cae373..d6476d1bf2 100644 --- a/disperser/cmd/batcher/main.go +++ b/disperser/cmd/batcher/main.go @@ -231,7 +231,7 @@ func RunBatcher(ctx *cli.Context) error { return errors.New("no wallet is configured. Either Fireblocks or PrivateKey wallet should be configured") } - txnManager := batcher.NewTxnManager(client, wallet, config.EthClientConfig.NumConfirmations, 20, config.TimeoutConfig.TxnBroadcastTimeout, config.TimeoutConfig.ChainWriteTimeout, logger, metrics.TxnManagerMetrics) + txnManager := common.NewTxnManager(client, wallet, config.EthClientConfig.NumConfirmations, 20, config.TimeoutConfig.TxnBroadcastTimeout, config.TimeoutConfig.ChainWriteTimeout, logger, metrics.TxnManagerMetrics) batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan) if err != nil { return err diff --git a/test/integration_test.go b/test/integration_test.go index 56c7c7d92b..67e43c2243 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -124,7 +124,7 @@ type TestDisperser struct { server *apiserver.DispersalServer encoderServer *encoder.Server transactor *coremock.MockTransactor - txnManager *batchermock.MockTxnManager + txnManager *commonmock.MockTxnManager } func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser.BlobStore, logger logging.Logger) TestDisperser { @@ -169,7 +169,7 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser finalizer := batchermock.NewFinalizer() disperserMetrics := disperser.NewMetrics("9100", logger) - txnManager := batchermock.NewTxnManager() + txnManager := commonmock.NewTxnManager() batcher, err := batcher.NewBatcher(batcherConfig, timeoutConfig, store, dispatcher, cst, asn, encoderClient, agg, &commonmock.MockEthClient{}, finalizer, transactor, txnManager, logger, batcherMetrics, handleBatchLivenessChan) if err != nil { @@ -423,7 +423,7 @@ func TestDispersalAndRetrieval(t *testing.T) { }, BlockNumber: big.NewInt(123), } - err = dis.batcher.ProcessConfirmedBatch(ctx, &batcher.ReceiptOrErr{ + err = dis.batcher.ProcessConfirmedBatch(ctx, &common.ReceiptOrErr{ Receipt: receipt, Err: nil, Metadata: dis.txnManager.Requests[len(dis.txnManager.Requests)-1].Metadata,