Skip to content

Commit

Permalink
finalize all confirmed blobs
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Jan 29, 2024
1 parent b2d11b5 commit 042b611
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 29 deletions.
1 change: 1 addition & 0 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type TimeoutConfig struct {
type Config struct {
PullInterval time.Duration
FinalizerInterval time.Duration
FinalizerPoolSize int
EncoderSocket string
SRSOrder int
NumConnections int
Expand Down
65 changes: 55 additions & 10 deletions disperser/batcher/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/gammazero/workerpool"

gcommon "github.com/ethereum/go-ethereum/common"
)
Expand All @@ -31,18 +32,35 @@ type finalizer struct {
ethClient common.EthClient
rpcClient common.RPCEthClient
maxNumRetriesPerBlob uint
numBlobsPerFetch int32
numWorkers int
logger common.Logger
metrics *FinalizerMetrics
}

func NewFinalizer(timeout time.Duration, loopInterval time.Duration, blobStore disperser.BlobStore, ethClient common.EthClient, rpcClient common.RPCEthClient, maxNumRetriesPerBlob uint, logger common.Logger) Finalizer {
func NewFinalizer(
timeout time.Duration,
loopInterval time.Duration,
blobStore disperser.BlobStore,
ethClient common.EthClient,
rpcClient common.RPCEthClient,
maxNumRetriesPerBlob uint,
numBlobsPerFetch int32,
numWorkers int,
logger common.Logger,
metrics *FinalizerMetrics,
) Finalizer {
return &finalizer{
timeout: timeout,
loopInterval: loopInterval,
blobStore: blobStore,
ethClient: ethClient,
rpcClient: rpcClient,
maxNumRetriesPerBlob: maxNumRetriesPerBlob,
numBlobsPerFetch: numBlobsPerFetch,
numWorkers: numWorkers,
logger: logger,
metrics: metrics,
}
}

Expand All @@ -68,19 +86,43 @@ func (f *finalizer) Start(ctx context.Context) {
// block number is less than or equal to the latest finalized block number.
// If it failes to process some blobs, it will log the error, skip the failed blobs, and will not return an error. The function should be invoked again to retry.
func (f *finalizer) FinalizeBlobs(ctx context.Context) error {
startTime := time.Now()
pool := workerpool.New(f.numWorkers)
finalizedHeader, err := f.getLatestFinalizedBlock(ctx)
if err != nil {
return fmt.Errorf("FinalizeBlobs: error getting latest finalized block: %w", err)
}
lastFinalBlock := finalizedHeader.Number.Uint64()

metadatas, err := f.blobStore.GetBlobMetadataByStatus(ctx, disperser.Confirmed)
if err != nil {
return fmt.Errorf("FinalizeBlobs: error getting blob headers: %w", err)
totalProcessed := 0
metadatas, exclusiveStartKey, err := f.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Confirmed, f.numBlobsPerFetch, nil)
for len(metadatas) > 0 {
if err != nil {
return fmt.Errorf("FinalizeBlobs: error getting blob headers: %w", err)
}
metadatas := metadatas
f.logger.Info("FinalizeBlobs: finalizing blobs", "numBlobs", len(metadatas), "finalizedBlockNumber", lastFinalBlock)
pool.Submit(func() {
f.updateBlobs(ctx, metadatas, lastFinalBlock)
})
totalProcessed += len(metadatas)

if exclusiveStartKey == nil {
break
}
metadatas, exclusiveStartKey, err = f.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Confirmed, f.numBlobsPerFetch, exclusiveStartKey)
}
pool.StopWait()
f.logger.Info("FinalizeBlobs: successfully processed all finalized blobs", "finalizedBlockNumber", lastFinalBlock, "totalProcessed", totalProcessed, "elapsedTime", time.Since(startTime))
f.metrics.UpdateLastFinalizedBlock(lastFinalBlock)
f.metrics.UpdateBlobs("processed", totalProcessed)
f.metrics.ObserveLatency("total", float64(time.Since(startTime).Milliseconds()))
return nil
}

f.logger.Info("FinalizeBlobs: finalizing blobs", "numBlobs", len(metadatas), "finalizedBlockNumber", finalizedHeader.Number)

func (f *finalizer) updateBlobs(ctx context.Context, metadatas []*disperser.BlobMetadata, lastFinalBlock uint64) {
for _, m := range metadatas {
stageTimer := time.Now()
blobKey := m.GetBlobKey()
confirmationMetadata, err := f.blobStore.GetBlobMetadata(ctx, blobKey)
if err != nil {
Expand All @@ -89,7 +131,7 @@ func (f *finalizer) FinalizeBlobs(ctx context.Context) error {
}

// Leave as confirmed if the confirmation block is after the latest finalized block (not yet finalized)
if uint64(confirmationMetadata.ConfirmationInfo.ConfirmationBlockNumber) > finalizedHeader.Number.Uint64() {
if uint64(confirmationMetadata.ConfirmationInfo.ConfirmationBlockNumber) > lastFinalBlock {
continue
}

Expand All @@ -101,27 +143,30 @@ func (f *finalizer) FinalizeBlobs(ctx context.Context) error {
if err != nil {
f.logger.Error("FinalizeBlobs: error marking blob as failed", "blobKey", blobKey.String(), "err", err)
}
f.metrics.IncrementBlobs("failed")
continue
}
if err != nil {
f.logger.Error("FinalizeBlobs: error getting transaction block number", "err", err)
f.metrics.IncrementBlobs("failed")
continue
}

// Leave as confirmed if the reorged confirmation block is after the latest finalized block (not yet finalized)
if uint64(confirmationBlockNumber) > finalizedHeader.Number.Uint64() {
if uint64(confirmationBlockNumber) > lastFinalBlock {
continue
}

confirmationMetadata.ConfirmationInfo.ConfirmationBlockNumber = uint32(confirmationBlockNumber)
err = f.blobStore.MarkBlobFinalized(ctx, blobKey)
if err != nil {
f.logger.Error("FinalizeBlobs: error marking blob as finalized", "blobKey", blobKey.String(), "err", err)
f.metrics.IncrementBlobs("failed")
continue
}
f.metrics.IncrementBlobs("finalized")
f.metrics.ObserveLatency("round", float64(time.Since(stageTimer).Milliseconds()))
}
f.logger.Info("FinalizeBlobs: successfully processed all finalized blobs")
return nil
}

func (f *finalizer) getTransactionBlockNumber(ctx context.Context, hash gcommon.Hash) (uint64, error) {
Expand Down
45 changes: 34 additions & 11 deletions disperser/batcher/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,18 @@ func TestFinalizedBlob(t *testing.T) {
BlockNumber: new(big.Int).SetUint64(1_000_000),
}, nil)

finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, logger)
metrics := batcher.NewMetrics("9100", logger)
finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, 1, 1, logger, metrics.FinalizerMetrics)

requestedAt := uint64(time.Now().UnixNano())
blob := makeTestBlob([]*core.SecurityParam{{
QuorumID: 0,
AdversaryThreshold: 80,
}})
ctx := context.Background()
metadataKey, err := queue.StoreBlob(ctx, &blob, requestedAt)
metadataKey1, err := queue.StoreBlob(ctx, &blob, requestedAt)
assert.NoError(t, err)
metadataKey2, err := queue.StoreBlob(ctx, &blob, requestedAt+1)
assert.NoError(t, err)
batchHeaderHash := [32]byte{1, 2, 3}
blobIndex := uint32(10)
Expand All @@ -66,9 +69,9 @@ func TestFinalizedBlob(t *testing.T) {
ConfirmationBlockNumber: uint32(150),
Fee: []byte{0},
}
metadata := &disperser.BlobMetadata{
BlobHash: metadataKey.BlobHash,
MetadataHash: metadataKey.MetadataHash,
metadata1 := &disperser.BlobMetadata{
BlobHash: metadataKey1.BlobHash,
MetadataHash: metadataKey1.MetadataHash,
BlobStatus: disperser.Processing,
Expiry: 0,
NumRetries: 0,
Expand All @@ -79,7 +82,23 @@ func TestFinalizedBlob(t *testing.T) {
RequestedAt: requestedAt,
},
}
m, err := queue.MarkBlobConfirmed(ctx, metadata, confirmationInfo)
metadata2 := &disperser.BlobMetadata{
BlobHash: metadataKey2.BlobHash,
MetadataHash: metadataKey2.MetadataHash,
BlobStatus: disperser.Processing,
Expiry: 0,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: core.BlobRequestHeader{
SecurityParams: blob.RequestHeader.SecurityParams,
},
RequestedAt: requestedAt + 1,
},
}
m, err := queue.MarkBlobConfirmed(ctx, metadata1, confirmationInfo)
assert.Equal(t, disperser.Confirmed, m.BlobStatus)
assert.NoError(t, err)
m, err = queue.MarkBlobConfirmed(ctx, metadata2, confirmationInfo)
assert.Equal(t, disperser.Confirmed, m.BlobStatus)
assert.NoError(t, err)

Expand All @@ -92,12 +111,14 @@ func TestFinalizedBlob(t *testing.T) {

metadatas, err = queue.GetBlobMetadataByStatus(ctx, disperser.Finalized)
assert.NoError(t, err)
assert.Len(t, metadatas, 1)
assert.Len(t, metadatas, 2)

assert.Equal(t, metadatas[0].BlobHash, metadataKey.BlobHash)
assert.ElementsMatch(t, []string{metadatas[0].BlobHash, metadatas[1].BlobHash}, []string{metadataKey1.BlobHash, metadataKey2.BlobHash})
assert.Equal(t, metadatas[0].BlobStatus, disperser.Finalized)
assert.Equal(t, metadatas[0].RequestMetadata.RequestedAt, requestedAt)
assert.Equal(t, metadatas[1].BlobStatus, disperser.Finalized)
assert.ElementsMatch(t, []uint64{metadatas[0].RequestMetadata.RequestedAt, metadatas[1].RequestMetadata.RequestedAt}, []uint64{requestedAt, requestedAt + 1})
assert.Equal(t, metadatas[0].RequestMetadata.SecurityParams, blob.RequestHeader.SecurityParams)
assert.Equal(t, metadatas[1].RequestMetadata.SecurityParams, blob.RequestHeader.SecurityParams)
}

func TestUnfinalizedBlob(t *testing.T) {
Expand All @@ -117,7 +138,8 @@ func TestUnfinalizedBlob(t *testing.T) {
BlockNumber: new(big.Int).SetUint64(1_000_100),
}, nil)

finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, logger)
metrics := batcher.NewMetrics("9100", logger)
finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, 1, 1, logger, metrics.FinalizerMetrics)

requestedAt := uint64(time.Now().UnixNano())
blob := makeTestBlob([]*core.SecurityParam{{
Expand Down Expand Up @@ -187,7 +209,8 @@ func TestNoReceipt(t *testing.T) {
}).Return(nil)
ethClient.On("TransactionReceipt", m.Anything, m.Anything).Return(nil, ethereum.NotFound)

finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, logger)
metrics := batcher.NewMetrics("9100", logger)
finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, 1, 1, logger, metrics.FinalizerMetrics)

requestedAt := uint64(time.Now().UnixNano())
blob := makeTestBlob([]*core.SecurityParam{{
Expand Down
51 changes: 51 additions & 0 deletions disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,16 @@ type TxnManagerMetrics struct {
NumTx *prometheus.CounterVec
}

type FinalizerMetrics struct {
Blobs *prometheus.CounterVec
LastFinalizedBlock prometheus.Gauge
Latency *prometheus.SummaryVec
}

type Metrics struct {
*EncodingStreamerMetrics
*TxnManagerMetrics
*FinalizerMetrics

registry *prometheus.Registry

Expand Down Expand Up @@ -115,9 +122,37 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
),
}

finalizerMetrics := FinalizerMetrics{
Blobs: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "finalizer_num_blobs",
Help: "number of blobs in each state",
},
[]string{"state"},
),
LastFinalizedBlock: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "last_finalized_block",
Help: "last finalized block number",
},
),
Latency: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "finalizer_process_latency_ms",
Help: "finalizer process latency summary in milliseconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
},
[]string{"stage"},
),
}

metrics := &Metrics{
EncodingStreamerMetrics: &encodingStreamerMetrics,
TxnManagerMetrics: &txnManagerMetrics,
FinalizerMetrics: &finalizerMetrics,
Blob: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand Down Expand Up @@ -248,3 +283,19 @@ func (t *TxnManagerMetrics) UpdateTxQueue(txQueue int) {
func (t *TxnManagerMetrics) IncrementTxnCount(state string) {
t.NumTx.WithLabelValues(state).Inc()
}

func (f *FinalizerMetrics) IncrementBlobs(state string) {
f.Blobs.WithLabelValues(state).Inc()
}

func (f *FinalizerMetrics) UpdateBlobs(state string, count int) {
f.Blobs.WithLabelValues(state).Add(float64(count))
}

func (f *FinalizerMetrics) UpdateLastFinalizedBlock(blockNumber uint64) {
f.LastFinalizedBlock.Set(float64(blockNumber))
}

func (f *FinalizerMetrics) ObserveLatency(stage string, latencyMs float64) {
f.Latency.WithLabelValues(stage).Observe(latencyMs)
}
1 change: 1 addition & 0 deletions disperser/cmd/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func NewConfig(ctx *cli.Context) Config {
BatcherConfig: batcher.Config{
PullInterval: ctx.GlobalDuration(flags.PullIntervalFlag.Name),
FinalizerInterval: ctx.GlobalDuration(flags.FinalizerIntervalFlag.Name),
FinalizerPoolSize: ctx.GlobalInt(flags.FinalizerPoolSizeFlag.Name),
EncoderSocket: ctx.GlobalString(flags.EncoderSocket.Name),
NumConnections: ctx.GlobalInt(flags.NumConnectionsFlag.Name),
EncodingRequestQueueSize: ctx.GlobalInt(flags.EncodingRequestQueueSizeFlag.Name),
Expand Down
8 changes: 8 additions & 0 deletions disperser/cmd/batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "FINALIZER_INTERVAL"),
Value: 6 * time.Minute,
}
FinalizerPoolSizeFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "finalizer-pool-size"),
Usage: "Size of the finalizer workerpool",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "FINALIZER_POOL_SIZE"),
Value: 4,
}
EncodingRequestQueueSizeFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "encoding-request-queue-size"),
Usage: "Size of the encoding request queue",
Expand Down Expand Up @@ -196,6 +203,7 @@ var optionalFlags = []cli.Flag{
ChainWriteTimeoutFlag,
NumConnectionsFlag,
FinalizerIntervalFlag,
FinalizerPoolSizeFlag,
EncodingRequestQueueSizeFlag,
MaxNumRetriesPerBlobFlag,
TargetNumChunksFlag,
Expand Down
2 changes: 1 addition & 1 deletion disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func RunBatcher(ctx *cli.Context) error {
if err != nil {
return err
}
finalizer := batcher.NewFinalizer(config.TimeoutConfig.ChainReadTimeout, config.BatcherConfig.FinalizerInterval, queue, client, rpcClient, config.BatcherConfig.MaxNumRetriesPerBlob, logger)
finalizer := batcher.NewFinalizer(config.TimeoutConfig.ChainReadTimeout, config.BatcherConfig.FinalizerInterval, queue, client, rpcClient, config.BatcherConfig.MaxNumRetriesPerBlob, 1000, config.BatcherConfig.FinalizerPoolSize, logger, metrics.FinalizerMetrics)
txnManager := batcher.NewTxnManager(client, 20, config.TimeoutConfig.ChainWriteTimeout, logger, metrics.TxnManagerMetrics)
batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics)
if err != nil {
Expand Down
23 changes: 16 additions & 7 deletions disperser/common/inmem/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,27 @@ func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, s
metas := make([]*disperser.BlobMetadata, 0)
foundStart := exclusiveStartKey == nil

for _, meta := range q.Metadata {
keys := make([]disperser.BlobKey, len(q.Metadata))
i := 0
for k := range q.Metadata {
keys[i] = k
i++
}
sort.Slice(keys, func(i, j int) bool {
return q.Metadata[keys[i]].RequestMetadata.RequestedAt < q.Metadata[keys[j]].RequestMetadata.RequestedAt
})
for _, key := range keys {
meta := q.Metadata[key]
if meta.BlobStatus == status {
if foundStart {
metas = append(metas, meta)
if len(metas) == int(limit) {
break
return metas, &disperser.BlobStoreExclusiveStartKey{
BlobStatus: int32(meta.BlobStatus),
RequestedAt: int64(meta.RequestMetadata.RequestedAt),
}, nil
}
} else if meta.BlobStatus == disperser.BlobStatus(exclusiveStartKey.BlobStatus) && meta.RequestMetadata.RequestedAt == uint64(exclusiveStartKey.RequestedAt) {
} else if meta.BlobStatus == disperser.BlobStatus(exclusiveStartKey.BlobStatus) && meta.RequestMetadata.RequestedAt > uint64(exclusiveStartKey.RequestedAt) {
foundStart = true // Found the starting point, start appending metas from next item
metas = append(metas, meta)
if len(metas) == int(limit) {
Expand All @@ -179,10 +192,6 @@ func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, s
}
}

sort.SliceStable(metas, func(i, j int) bool {
return metas[i].RequestMetadata.RequestedAt < metas[j].RequestMetadata.RequestedAt
})

// Return all the metas if limit is not reached
return metas, nil, nil
}
Expand Down

0 comments on commit 042b611

Please sign in to comment.