From ccddab258bbe3ea77ea6fb9b4bf25989858795b8 Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Sat, 27 Jan 2024 17:15:14 -0800 Subject: [PATCH] finalize all confirmed blobs --- disperser/batcher/batcher.go | 1 + disperser/batcher/finalizer.go | 65 +++++++++++++++++++++++----- disperser/batcher/finalizer_test.go | 45 ++++++++++++++----- disperser/batcher/metrics.go | 51 ++++++++++++++++++++++ disperser/cmd/batcher/config.go | 1 + disperser/cmd/batcher/flags/flags.go | 8 ++++ disperser/cmd/batcher/main.go | 2 +- disperser/common/inmem/store.go | 23 +++++++--- 8 files changed, 167 insertions(+), 29 deletions(-) diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 39353d6422..c970373234 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -47,6 +47,7 @@ type TimeoutConfig struct { type Config struct { PullInterval time.Duration FinalizerInterval time.Duration + FinalizerPoolSize int EncoderSocket string SRSOrder int NumConnections int diff --git a/disperser/batcher/finalizer.go b/disperser/batcher/finalizer.go index c286ae1903..4cf21d97b1 100644 --- a/disperser/batcher/finalizer.go +++ b/disperser/batcher/finalizer.go @@ -11,6 +11,7 @@ import ( "github.com/Layr-Labs/eigenda/disperser" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" + "github.com/gammazero/workerpool" gcommon "github.com/ethereum/go-ethereum/common" ) @@ -31,10 +32,24 @@ type finalizer struct { ethClient common.EthClient rpcClient common.RPCEthClient maxNumRetriesPerBlob uint + numBlobsPerFetch int32 + numWorkers int logger common.Logger + metrics *FinalizerMetrics } -func NewFinalizer(timeout time.Duration, loopInterval time.Duration, blobStore disperser.BlobStore, ethClient common.EthClient, rpcClient common.RPCEthClient, maxNumRetriesPerBlob uint, logger common.Logger) Finalizer { +func NewFinalizer( + timeout time.Duration, + loopInterval time.Duration, + blobStore disperser.BlobStore, + ethClient common.EthClient, + rpcClient common.RPCEthClient, + maxNumRetriesPerBlob uint, + numBlobsPerFetch int32, + numWorkers int, + logger common.Logger, + metrics *FinalizerMetrics, +) Finalizer { return &finalizer{ timeout: timeout, loopInterval: loopInterval, @@ -42,7 +57,10 @@ func NewFinalizer(timeout time.Duration, loopInterval time.Duration, blobStore d ethClient: ethClient, rpcClient: rpcClient, maxNumRetriesPerBlob: maxNumRetriesPerBlob, + numBlobsPerFetch: numBlobsPerFetch, + numWorkers: numWorkers, logger: logger, + metrics: metrics, } } @@ -68,19 +86,43 @@ func (f *finalizer) Start(ctx context.Context) { // block number is less than or equal to the latest finalized block number. // If it failes to process some blobs, it will log the error, skip the failed blobs, and will not return an error. The function should be invoked again to retry. func (f *finalizer) FinalizeBlobs(ctx context.Context) error { + startTime := time.Now() + pool := workerpool.New(f.numWorkers) finalizedHeader, err := f.getLatestFinalizedBlock(ctx) if err != nil { return fmt.Errorf("FinalizeBlobs: error getting latest finalized block: %w", err) } + lastFinalBlock := finalizedHeader.Number.Uint64() - metadatas, err := f.blobStore.GetBlobMetadataByStatus(ctx, disperser.Confirmed) - if err != nil { - return fmt.Errorf("FinalizeBlobs: error getting blob headers: %w", err) + totalProcessed := 0 + metadatas, exclusiveStartKey, err := f.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Confirmed, f.numBlobsPerFetch, nil) + for len(metadatas) > 0 { + if err != nil { + return fmt.Errorf("FinalizeBlobs: error getting blob headers: %w", err) + } + metadatas := metadatas + f.logger.Info("FinalizeBlobs: finalizing blobs", "numBlobs", len(metadatas), "finalizedBlockNumber", lastFinalBlock) + pool.Submit(func() { + f.updateBlobs(ctx, metadatas, lastFinalBlock) + }) + totalProcessed += len(metadatas) + + if exclusiveStartKey == nil { + break + } + metadatas, exclusiveStartKey, err = f.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Confirmed, f.numBlobsPerFetch, exclusiveStartKey) } + pool.StopWait() + f.logger.Info("FinalizeBlobs: successfully processed all finalized blobs", "finalizedBlockNumber", lastFinalBlock, "totalProcessed", totalProcessed, "elapsedTime", time.Since(startTime)) + f.metrics.UpdateLastSeenFinalizedBlock(lastFinalBlock) + f.metrics.UpdateNumBlobs("processed", totalProcessed) + f.metrics.ObserveLatency("total", float64(time.Since(startTime).Milliseconds())) + return nil +} - f.logger.Info("FinalizeBlobs: finalizing blobs", "numBlobs", len(metadatas), "finalizedBlockNumber", finalizedHeader.Number) - +func (f *finalizer) updateBlobs(ctx context.Context, metadatas []*disperser.BlobMetadata, lastFinalBlock uint64) { for _, m := range metadatas { + stageTimer := time.Now() blobKey := m.GetBlobKey() confirmationMetadata, err := f.blobStore.GetBlobMetadata(ctx, blobKey) if err != nil { @@ -89,7 +131,7 @@ func (f *finalizer) FinalizeBlobs(ctx context.Context) error { } // Leave as confirmed if the confirmation block is after the latest finalized block (not yet finalized) - if uint64(confirmationMetadata.ConfirmationInfo.ConfirmationBlockNumber) > finalizedHeader.Number.Uint64() { + if uint64(confirmationMetadata.ConfirmationInfo.ConfirmationBlockNumber) > lastFinalBlock { continue } @@ -101,15 +143,17 @@ func (f *finalizer) FinalizeBlobs(ctx context.Context) error { if err != nil { f.logger.Error("FinalizeBlobs: error marking blob as failed", "blobKey", blobKey.String(), "err", err) } + f.metrics.IncrementNumBlobs("failed") continue } if err != nil { f.logger.Error("FinalizeBlobs: error getting transaction block number", "err", err) + f.metrics.IncrementNumBlobs("failed") continue } // Leave as confirmed if the reorged confirmation block is after the latest finalized block (not yet finalized) - if uint64(confirmationBlockNumber) > finalizedHeader.Number.Uint64() { + if uint64(confirmationBlockNumber) > lastFinalBlock { continue } @@ -117,11 +161,12 @@ func (f *finalizer) FinalizeBlobs(ctx context.Context) error { err = f.blobStore.MarkBlobFinalized(ctx, blobKey) if err != nil { f.logger.Error("FinalizeBlobs: error marking blob as finalized", "blobKey", blobKey.String(), "err", err) + f.metrics.IncrementNumBlobs("failed") continue } + f.metrics.IncrementNumBlobs("finalized") + f.metrics.ObserveLatency("round", float64(time.Since(stageTimer).Milliseconds())) } - f.logger.Info("FinalizeBlobs: successfully processed all finalized blobs") - return nil } func (f *finalizer) getTransactionBlockNumber(ctx context.Context, hash gcommon.Hash) (uint64, error) { diff --git a/disperser/batcher/finalizer_test.go b/disperser/batcher/finalizer_test.go index f4038ca209..a240888a5f 100644 --- a/disperser/batcher/finalizer_test.go +++ b/disperser/batcher/finalizer_test.go @@ -39,7 +39,8 @@ func TestFinalizedBlob(t *testing.T) { BlockNumber: new(big.Int).SetUint64(1_000_000), }, nil) - finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, logger) + metrics := batcher.NewMetrics("9100", logger) + finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, 1, 1, logger, metrics.FinalizerMetrics) requestedAt := uint64(time.Now().UnixNano()) blob := makeTestBlob([]*core.SecurityParam{{ @@ -47,7 +48,9 @@ func TestFinalizedBlob(t *testing.T) { AdversaryThreshold: 80, }}) ctx := context.Background() - metadataKey, err := queue.StoreBlob(ctx, &blob, requestedAt) + metadataKey1, err := queue.StoreBlob(ctx, &blob, requestedAt) + assert.NoError(t, err) + metadataKey2, err := queue.StoreBlob(ctx, &blob, requestedAt+1) assert.NoError(t, err) batchHeaderHash := [32]byte{1, 2, 3} blobIndex := uint32(10) @@ -66,9 +69,9 @@ func TestFinalizedBlob(t *testing.T) { ConfirmationBlockNumber: uint32(150), Fee: []byte{0}, } - metadata := &disperser.BlobMetadata{ - BlobHash: metadataKey.BlobHash, - MetadataHash: metadataKey.MetadataHash, + metadata1 := &disperser.BlobMetadata{ + BlobHash: metadataKey1.BlobHash, + MetadataHash: metadataKey1.MetadataHash, BlobStatus: disperser.Processing, Expiry: 0, NumRetries: 0, @@ -79,7 +82,23 @@ func TestFinalizedBlob(t *testing.T) { RequestedAt: requestedAt, }, } - m, err := queue.MarkBlobConfirmed(ctx, metadata, confirmationInfo) + metadata2 := &disperser.BlobMetadata{ + BlobHash: metadataKey2.BlobHash, + MetadataHash: metadataKey2.MetadataHash, + BlobStatus: disperser.Processing, + Expiry: 0, + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: core.BlobRequestHeader{ + SecurityParams: blob.RequestHeader.SecurityParams, + }, + RequestedAt: requestedAt + 1, + }, + } + m, err := queue.MarkBlobConfirmed(ctx, metadata1, confirmationInfo) + assert.Equal(t, disperser.Confirmed, m.BlobStatus) + assert.NoError(t, err) + m, err = queue.MarkBlobConfirmed(ctx, metadata2, confirmationInfo) assert.Equal(t, disperser.Confirmed, m.BlobStatus) assert.NoError(t, err) @@ -92,12 +111,14 @@ func TestFinalizedBlob(t *testing.T) { metadatas, err = queue.GetBlobMetadataByStatus(ctx, disperser.Finalized) assert.NoError(t, err) - assert.Len(t, metadatas, 1) + assert.Len(t, metadatas, 2) - assert.Equal(t, metadatas[0].BlobHash, metadataKey.BlobHash) + assert.ElementsMatch(t, []string{metadatas[0].BlobHash, metadatas[1].BlobHash}, []string{metadataKey1.BlobHash, metadataKey2.BlobHash}) assert.Equal(t, metadatas[0].BlobStatus, disperser.Finalized) - assert.Equal(t, metadatas[0].RequestMetadata.RequestedAt, requestedAt) + assert.Equal(t, metadatas[1].BlobStatus, disperser.Finalized) + assert.ElementsMatch(t, []uint64{metadatas[0].RequestMetadata.RequestedAt, metadatas[1].RequestMetadata.RequestedAt}, []uint64{requestedAt, requestedAt + 1}) assert.Equal(t, metadatas[0].RequestMetadata.SecurityParams, blob.RequestHeader.SecurityParams) + assert.Equal(t, metadatas[1].RequestMetadata.SecurityParams, blob.RequestHeader.SecurityParams) } func TestUnfinalizedBlob(t *testing.T) { @@ -117,7 +138,8 @@ func TestUnfinalizedBlob(t *testing.T) { BlockNumber: new(big.Int).SetUint64(1_000_100), }, nil) - finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, logger) + metrics := batcher.NewMetrics("9100", logger) + finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, 1, 1, logger, metrics.FinalizerMetrics) requestedAt := uint64(time.Now().UnixNano()) blob := makeTestBlob([]*core.SecurityParam{{ @@ -187,7 +209,8 @@ func TestNoReceipt(t *testing.T) { }).Return(nil) ethClient.On("TransactionReceipt", m.Anything, m.Anything).Return(nil, ethereum.NotFound) - finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, logger) + metrics := batcher.NewMetrics("9100", logger) + finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, 1, 1, logger, metrics.FinalizerMetrics) requestedAt := uint64(time.Now().UnixNano()) blob := makeTestBlob([]*core.SecurityParam{{ diff --git a/disperser/batcher/metrics.go b/disperser/batcher/metrics.go index c5dfce2573..aa70c53f6f 100644 --- a/disperser/batcher/metrics.go +++ b/disperser/batcher/metrics.go @@ -43,9 +43,16 @@ type TxnManagerMetrics struct { NumTx *prometheus.CounterVec } +type FinalizerMetrics struct { + NumBlobs *prometheus.CounterVec + LastSeenFinalizedBlock prometheus.Gauge + Latency *prometheus.SummaryVec +} + type Metrics struct { *EncodingStreamerMetrics *TxnManagerMetrics + *FinalizerMetrics registry *prometheus.Registry @@ -115,9 +122,37 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics { ), } + finalizerMetrics := FinalizerMetrics{ + NumBlobs: promauto.With(reg).NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "finalizer_num_blobs", + Help: "number of blobs in each state", + }, + []string{"state"}, // possible values are "processed", "failed", "finalized" + ), + LastSeenFinalizedBlock: promauto.With(reg).NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "last_finalized_block", + Help: "last finalized block number", + }, + ), + Latency: promauto.With(reg).NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "finalizer_process_latency_ms", + Help: "finalizer process latency summary in milliseconds", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, + }, + []string{"stage"}, // possible values are "round" and "total" + ), + } + metrics := &Metrics{ EncodingStreamerMetrics: &encodingStreamerMetrics, TxnManagerMetrics: &txnManagerMetrics, + FinalizerMetrics: &finalizerMetrics, Blob: promauto.With(reg).NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, @@ -248,3 +283,19 @@ func (t *TxnManagerMetrics) UpdateTxQueue(txQueue int) { func (t *TxnManagerMetrics) IncrementTxnCount(state string) { t.NumTx.WithLabelValues(state).Inc() } + +func (f *FinalizerMetrics) IncrementNumBlobs(state string) { + f.NumBlobs.WithLabelValues(state).Inc() +} + +func (f *FinalizerMetrics) UpdateNumBlobs(state string, count int) { + f.NumBlobs.WithLabelValues(state).Add(float64(count)) +} + +func (f *FinalizerMetrics) UpdateLastSeenFinalizedBlock(blockNumber uint64) { + f.LastSeenFinalizedBlock.Set(float64(blockNumber)) +} + +func (f *FinalizerMetrics) ObserveLatency(stage string, latencyMs float64) { + f.Latency.WithLabelValues(stage).Observe(latencyMs) +} diff --git a/disperser/cmd/batcher/config.go b/disperser/cmd/batcher/config.go index 49d918b5b8..e099604519 100644 --- a/disperser/cmd/batcher/config.go +++ b/disperser/cmd/batcher/config.go @@ -45,6 +45,7 @@ func NewConfig(ctx *cli.Context) Config { BatcherConfig: batcher.Config{ PullInterval: ctx.GlobalDuration(flags.PullIntervalFlag.Name), FinalizerInterval: ctx.GlobalDuration(flags.FinalizerIntervalFlag.Name), + FinalizerPoolSize: ctx.GlobalInt(flags.FinalizerPoolSizeFlag.Name), EncoderSocket: ctx.GlobalString(flags.EncoderSocket.Name), NumConnections: ctx.GlobalInt(flags.NumConnectionsFlag.Name), EncodingRequestQueueSize: ctx.GlobalInt(flags.EncodingRequestQueueSizeFlag.Name), diff --git a/disperser/cmd/batcher/flags/flags.go b/disperser/cmd/batcher/flags/flags.go index 15f974dc8a..7a12856881 100644 --- a/disperser/cmd/batcher/flags/flags.go +++ b/disperser/cmd/batcher/flags/flags.go @@ -134,6 +134,13 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "FINALIZER_INTERVAL"), Value: 6 * time.Minute, } + FinalizerPoolSizeFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "finalizer-pool-size"), + Usage: "Size of the finalizer workerpool", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "FINALIZER_POOL_SIZE"), + Value: 4, + } EncodingRequestQueueSizeFlag = cli.IntFlag{ Name: common.PrefixFlag(FlagPrefix, "encoding-request-queue-size"), Usage: "Size of the encoding request queue", @@ -196,6 +203,7 @@ var optionalFlags = []cli.Flag{ ChainWriteTimeoutFlag, NumConnectionsFlag, FinalizerIntervalFlag, + FinalizerPoolSizeFlag, EncodingRequestQueueSizeFlag, MaxNumRetriesPerBlobFlag, TargetNumChunksFlag, diff --git a/disperser/cmd/batcher/main.go b/disperser/cmd/batcher/main.go index 1992215107..309c75369d 100644 --- a/disperser/cmd/batcher/main.go +++ b/disperser/cmd/batcher/main.go @@ -140,7 +140,7 @@ func RunBatcher(ctx *cli.Context) error { if err != nil { return err } - finalizer := batcher.NewFinalizer(config.TimeoutConfig.ChainReadTimeout, config.BatcherConfig.FinalizerInterval, queue, client, rpcClient, config.BatcherConfig.MaxNumRetriesPerBlob, logger) + finalizer := batcher.NewFinalizer(config.TimeoutConfig.ChainReadTimeout, config.BatcherConfig.FinalizerInterval, queue, client, rpcClient, config.BatcherConfig.MaxNumRetriesPerBlob, 1000, config.BatcherConfig.FinalizerPoolSize, logger, metrics.FinalizerMetrics) txnManager := batcher.NewTxnManager(client, 20, config.TimeoutConfig.ChainWriteTimeout, logger, metrics.TxnManagerMetrics) batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics) if err != nil { diff --git a/disperser/common/inmem/store.go b/disperser/common/inmem/store.go index 129d3733af..1482be99ac 100644 --- a/disperser/common/inmem/store.go +++ b/disperser/common/inmem/store.go @@ -159,14 +159,27 @@ func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, s metas := make([]*disperser.BlobMetadata, 0) foundStart := exclusiveStartKey == nil - for _, meta := range q.Metadata { + keys := make([]disperser.BlobKey, len(q.Metadata)) + i := 0 + for k := range q.Metadata { + keys[i] = k + i++ + } + sort.Slice(keys, func(i, j int) bool { + return q.Metadata[keys[i]].RequestMetadata.RequestedAt < q.Metadata[keys[j]].RequestMetadata.RequestedAt + }) + for _, key := range keys { + meta := q.Metadata[key] if meta.BlobStatus == status { if foundStart { metas = append(metas, meta) if len(metas) == int(limit) { - break + return metas, &disperser.BlobStoreExclusiveStartKey{ + BlobStatus: int32(meta.BlobStatus), + RequestedAt: int64(meta.RequestMetadata.RequestedAt), + }, nil } - } else if meta.BlobStatus == disperser.BlobStatus(exclusiveStartKey.BlobStatus) && meta.RequestMetadata.RequestedAt == uint64(exclusiveStartKey.RequestedAt) { + } else if meta.BlobStatus == disperser.BlobStatus(exclusiveStartKey.BlobStatus) && meta.RequestMetadata.RequestedAt > uint64(exclusiveStartKey.RequestedAt) { foundStart = true // Found the starting point, start appending metas from next item metas = append(metas, meta) if len(metas) == int(limit) { @@ -179,10 +192,6 @@ func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, s } } - sort.SliceStable(metas, func(i, j int) bool { - return metas[i].RequestMetadata.RequestedAt < metas[j].RequestMetadata.RequestedAt - }) - // Return all the metas if limit is not reached return metas, nil, nil }