Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[finalizer] Go through all confirmed blobs #217

Merged
merged 1 commit into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type TimeoutConfig struct {
type Config struct {
PullInterval time.Duration
FinalizerInterval time.Duration
FinalizerPoolSize int
EncoderSocket string
SRSOrder int
NumConnections int
Expand Down
65 changes: 55 additions & 10 deletions disperser/batcher/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/gammazero/workerpool"

gcommon "github.com/ethereum/go-ethereum/common"
)
Expand All @@ -31,18 +32,35 @@ type finalizer struct {
ethClient common.EthClient
rpcClient common.RPCEthClient
maxNumRetriesPerBlob uint
numBlobsPerFetch int32
numWorkers int
logger common.Logger
metrics *FinalizerMetrics
}

func NewFinalizer(timeout time.Duration, loopInterval time.Duration, blobStore disperser.BlobStore, ethClient common.EthClient, rpcClient common.RPCEthClient, maxNumRetriesPerBlob uint, logger common.Logger) Finalizer {
func NewFinalizer(
timeout time.Duration,
loopInterval time.Duration,
blobStore disperser.BlobStore,
ethClient common.EthClient,
rpcClient common.RPCEthClient,
maxNumRetriesPerBlob uint,
numBlobsPerFetch int32,
numWorkers int,
logger common.Logger,
metrics *FinalizerMetrics,
) Finalizer {
return &finalizer{
timeout: timeout,
loopInterval: loopInterval,
blobStore: blobStore,
ethClient: ethClient,
rpcClient: rpcClient,
maxNumRetriesPerBlob: maxNumRetriesPerBlob,
numBlobsPerFetch: numBlobsPerFetch,
numWorkers: numWorkers,
logger: logger,
metrics: metrics,
}
}

Expand All @@ -68,19 +86,43 @@ func (f *finalizer) Start(ctx context.Context) {
// block number is less than or equal to the latest finalized block number.
// If it failes to process some blobs, it will log the error, skip the failed blobs, and will not return an error. The function should be invoked again to retry.
func (f *finalizer) FinalizeBlobs(ctx context.Context) error {
startTime := time.Now()
pool := workerpool.New(f.numWorkers)
finalizedHeader, err := f.getLatestFinalizedBlock(ctx)
if err != nil {
return fmt.Errorf("FinalizeBlobs: error getting latest finalized block: %w", err)
}
lastFinalBlock := finalizedHeader.Number.Uint64()

metadatas, err := f.blobStore.GetBlobMetadataByStatus(ctx, disperser.Confirmed)
if err != nil {
return fmt.Errorf("FinalizeBlobs: error getting blob headers: %w", err)
totalProcessed := 0
metadatas, exclusiveStartKey, err := f.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Confirmed, f.numBlobsPerFetch, nil)
siddimore marked this conversation as resolved.
Show resolved Hide resolved
ian-shim marked this conversation as resolved.
Show resolved Hide resolved
for len(metadatas) > 0 {
ian-shim marked this conversation as resolved.
Show resolved Hide resolved
ian-shim marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("FinalizeBlobs: error getting blob headers: %w", err)
}
metadatas := metadatas
f.logger.Info("FinalizeBlobs: finalizing blobs", "numBlobs", len(metadatas), "finalizedBlockNumber", lastFinalBlock)
pool.Submit(func() {
f.updateBlobs(ctx, metadatas, lastFinalBlock)
})
totalProcessed += len(metadatas)

if exclusiveStartKey == nil {
break
}
metadatas, exclusiveStartKey, err = f.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Confirmed, f.numBlobsPerFetch, exclusiveStartKey)
}
pool.StopWait()
f.logger.Info("FinalizeBlobs: successfully processed all finalized blobs", "finalizedBlockNumber", lastFinalBlock, "totalProcessed", totalProcessed, "elapsedTime", time.Since(startTime))
f.metrics.UpdateLastSeenFinalizedBlock(lastFinalBlock)
f.metrics.UpdateNumBlobs("processed", totalProcessed)
f.metrics.ObserveLatency("total", float64(time.Since(startTime).Milliseconds()))
return nil
}

f.logger.Info("FinalizeBlobs: finalizing blobs", "numBlobs", len(metadatas), "finalizedBlockNumber", finalizedHeader.Number)

func (f *finalizer) updateBlobs(ctx context.Context, metadatas []*disperser.BlobMetadata, lastFinalBlock uint64) {
for _, m := range metadatas {
stageTimer := time.Now()
blobKey := m.GetBlobKey()
confirmationMetadata, err := f.blobStore.GetBlobMetadata(ctx, blobKey)
if err != nil {
Expand All @@ -89,7 +131,7 @@ func (f *finalizer) FinalizeBlobs(ctx context.Context) error {
}

// Leave as confirmed if the confirmation block is after the latest finalized block (not yet finalized)
if uint64(confirmationMetadata.ConfirmationInfo.ConfirmationBlockNumber) > finalizedHeader.Number.Uint64() {
if uint64(confirmationMetadata.ConfirmationInfo.ConfirmationBlockNumber) > lastFinalBlock {
continue
}

Expand All @@ -101,27 +143,30 @@ func (f *finalizer) FinalizeBlobs(ctx context.Context) error {
if err != nil {
f.logger.Error("FinalizeBlobs: error marking blob as failed", "blobKey", blobKey.String(), "err", err)
}
f.metrics.IncrementNumBlobs("failed")
continue
}
if err != nil {
f.logger.Error("FinalizeBlobs: error getting transaction block number", "err", err)
f.metrics.IncrementNumBlobs("failed")
continue
}

// Leave as confirmed if the reorged confirmation block is after the latest finalized block (not yet finalized)
if uint64(confirmationBlockNumber) > finalizedHeader.Number.Uint64() {
if uint64(confirmationBlockNumber) > lastFinalBlock {
continue
}

confirmationMetadata.ConfirmationInfo.ConfirmationBlockNumber = uint32(confirmationBlockNumber)
err = f.blobStore.MarkBlobFinalized(ctx, blobKey)
if err != nil {
f.logger.Error("FinalizeBlobs: error marking blob as finalized", "blobKey", blobKey.String(), "err", err)
f.metrics.IncrementNumBlobs("failed")
continue
}
f.metrics.IncrementNumBlobs("finalized")
f.metrics.ObserveLatency("round", float64(time.Since(stageTimer).Milliseconds()))
}
f.logger.Info("FinalizeBlobs: successfully processed all finalized blobs")
return nil
}

func (f *finalizer) getTransactionBlockNumber(ctx context.Context, hash gcommon.Hash) (uint64, error) {
Expand Down
45 changes: 34 additions & 11 deletions disperser/batcher/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,18 @@ func TestFinalizedBlob(t *testing.T) {
BlockNumber: new(big.Int).SetUint64(1_000_000),
}, nil)

finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, logger)
metrics := batcher.NewMetrics("9100", logger)
finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, 1, 1, logger, metrics.FinalizerMetrics)

requestedAt := uint64(time.Now().UnixNano())
blob := makeTestBlob([]*core.SecurityParam{{
QuorumID: 0,
AdversaryThreshold: 80,
}})
ctx := context.Background()
metadataKey, err := queue.StoreBlob(ctx, &blob, requestedAt)
metadataKey1, err := queue.StoreBlob(ctx, &blob, requestedAt)
assert.NoError(t, err)
metadataKey2, err := queue.StoreBlob(ctx, &blob, requestedAt+1)
assert.NoError(t, err)
batchHeaderHash := [32]byte{1, 2, 3}
blobIndex := uint32(10)
Expand All @@ -66,9 +69,9 @@ func TestFinalizedBlob(t *testing.T) {
ConfirmationBlockNumber: uint32(150),
Fee: []byte{0},
}
metadata := &disperser.BlobMetadata{
BlobHash: metadataKey.BlobHash,
MetadataHash: metadataKey.MetadataHash,
metadata1 := &disperser.BlobMetadata{
BlobHash: metadataKey1.BlobHash,
MetadataHash: metadataKey1.MetadataHash,
BlobStatus: disperser.Processing,
Expiry: 0,
NumRetries: 0,
Expand All @@ -79,7 +82,23 @@ func TestFinalizedBlob(t *testing.T) {
RequestedAt: requestedAt,
},
}
m, err := queue.MarkBlobConfirmed(ctx, metadata, confirmationInfo)
metadata2 := &disperser.BlobMetadata{
BlobHash: metadataKey2.BlobHash,
MetadataHash: metadataKey2.MetadataHash,
BlobStatus: disperser.Processing,
Expiry: 0,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: core.BlobRequestHeader{
SecurityParams: blob.RequestHeader.SecurityParams,
},
RequestedAt: requestedAt + 1,
},
}
m, err := queue.MarkBlobConfirmed(ctx, metadata1, confirmationInfo)
assert.Equal(t, disperser.Confirmed, m.BlobStatus)
assert.NoError(t, err)
m, err = queue.MarkBlobConfirmed(ctx, metadata2, confirmationInfo)
assert.Equal(t, disperser.Confirmed, m.BlobStatus)
assert.NoError(t, err)

Expand All @@ -92,12 +111,14 @@ func TestFinalizedBlob(t *testing.T) {

metadatas, err = queue.GetBlobMetadataByStatus(ctx, disperser.Finalized)
assert.NoError(t, err)
assert.Len(t, metadatas, 1)
assert.Len(t, metadatas, 2)

assert.Equal(t, metadatas[0].BlobHash, metadataKey.BlobHash)
assert.ElementsMatch(t, []string{metadatas[0].BlobHash, metadatas[1].BlobHash}, []string{metadataKey1.BlobHash, metadataKey2.BlobHash})
assert.Equal(t, metadatas[0].BlobStatus, disperser.Finalized)
assert.Equal(t, metadatas[0].RequestMetadata.RequestedAt, requestedAt)
assert.Equal(t, metadatas[1].BlobStatus, disperser.Finalized)
assert.ElementsMatch(t, []uint64{metadatas[0].RequestMetadata.RequestedAt, metadatas[1].RequestMetadata.RequestedAt}, []uint64{requestedAt, requestedAt + 1})
assert.Equal(t, metadatas[0].RequestMetadata.SecurityParams, blob.RequestHeader.SecurityParams)
assert.Equal(t, metadatas[1].RequestMetadata.SecurityParams, blob.RequestHeader.SecurityParams)
}

func TestUnfinalizedBlob(t *testing.T) {
Expand All @@ -117,7 +138,8 @@ func TestUnfinalizedBlob(t *testing.T) {
BlockNumber: new(big.Int).SetUint64(1_000_100),
}, nil)

finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, logger)
metrics := batcher.NewMetrics("9100", logger)
finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, 1, 1, logger, metrics.FinalizerMetrics)

requestedAt := uint64(time.Now().UnixNano())
blob := makeTestBlob([]*core.SecurityParam{{
Expand Down Expand Up @@ -187,7 +209,8 @@ func TestNoReceipt(t *testing.T) {
}).Return(nil)
ethClient.On("TransactionReceipt", m.Anything, m.Anything).Return(nil, ethereum.NotFound)

finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, logger)
metrics := batcher.NewMetrics("9100", logger)
finalizer := batcher.NewFinalizer(timeout, loopInterval, queue, ethClient, rpcClient, 1, 1, 1, logger, metrics.FinalizerMetrics)

requestedAt := uint64(time.Now().UnixNano())
blob := makeTestBlob([]*core.SecurityParam{{
Expand Down
51 changes: 51 additions & 0 deletions disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,16 @@ type TxnManagerMetrics struct {
NumTx *prometheus.CounterVec
}

type FinalizerMetrics struct {
NumBlobs *prometheus.CounterVec
LastSeenFinalizedBlock prometheus.Gauge
Latency *prometheus.SummaryVec
}

type Metrics struct {
*EncodingStreamerMetrics
*TxnManagerMetrics
*FinalizerMetrics

registry *prometheus.Registry

Expand Down Expand Up @@ -115,9 +122,37 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
),
}

finalizerMetrics := FinalizerMetrics{
NumBlobs: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "finalizer_num_blobs",
Help: "number of blobs in each state",
},
[]string{"state"}, // possible values are "processed", "failed", "finalized"
),
LastSeenFinalizedBlock: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "last_finalized_block",
Help: "last finalized block number",
},
),
Latency: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "finalizer_process_latency_ms",
Help: "finalizer process latency summary in milliseconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
},
[]string{"stage"}, // possible values are "round" and "total"
),
}

metrics := &Metrics{
EncodingStreamerMetrics: &encodingStreamerMetrics,
TxnManagerMetrics: &txnManagerMetrics,
FinalizerMetrics: &finalizerMetrics,
Blob: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand Down Expand Up @@ -248,3 +283,19 @@ func (t *TxnManagerMetrics) UpdateTxQueue(txQueue int) {
func (t *TxnManagerMetrics) IncrementTxnCount(state string) {
t.NumTx.WithLabelValues(state).Inc()
}

func (f *FinalizerMetrics) IncrementNumBlobs(state string) {
f.NumBlobs.WithLabelValues(state).Inc()
}

func (f *FinalizerMetrics) UpdateNumBlobs(state string, count int) {
f.NumBlobs.WithLabelValues(state).Add(float64(count))
}

func (f *FinalizerMetrics) UpdateLastSeenFinalizedBlock(blockNumber uint64) {
f.LastSeenFinalizedBlock.Set(float64(blockNumber))
}

func (f *FinalizerMetrics) ObserveLatency(stage string, latencyMs float64) {
f.Latency.WithLabelValues(stage).Observe(latencyMs)
}
1 change: 1 addition & 0 deletions disperser/cmd/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func NewConfig(ctx *cli.Context) Config {
BatcherConfig: batcher.Config{
PullInterval: ctx.GlobalDuration(flags.PullIntervalFlag.Name),
FinalizerInterval: ctx.GlobalDuration(flags.FinalizerIntervalFlag.Name),
FinalizerPoolSize: ctx.GlobalInt(flags.FinalizerPoolSizeFlag.Name),
EncoderSocket: ctx.GlobalString(flags.EncoderSocket.Name),
NumConnections: ctx.GlobalInt(flags.NumConnectionsFlag.Name),
EncodingRequestQueueSize: ctx.GlobalInt(flags.EncodingRequestQueueSizeFlag.Name),
Expand Down
8 changes: 8 additions & 0 deletions disperser/cmd/batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "FINALIZER_INTERVAL"),
Value: 6 * time.Minute,
}
FinalizerPoolSizeFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "finalizer-pool-size"),
Usage: "Size of the finalizer workerpool",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "FINALIZER_POOL_SIZE"),
Value: 4,
}
EncodingRequestQueueSizeFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "encoding-request-queue-size"),
Usage: "Size of the encoding request queue",
Expand Down Expand Up @@ -196,6 +203,7 @@ var optionalFlags = []cli.Flag{
ChainWriteTimeoutFlag,
NumConnectionsFlag,
FinalizerIntervalFlag,
FinalizerPoolSizeFlag,
EncodingRequestQueueSizeFlag,
MaxNumRetriesPerBlobFlag,
TargetNumChunksFlag,
Expand Down
2 changes: 1 addition & 1 deletion disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func RunBatcher(ctx *cli.Context) error {
if err != nil {
return err
}
finalizer := batcher.NewFinalizer(config.TimeoutConfig.ChainReadTimeout, config.BatcherConfig.FinalizerInterval, queue, client, rpcClient, config.BatcherConfig.MaxNumRetriesPerBlob, logger)
finalizer := batcher.NewFinalizer(config.TimeoutConfig.ChainReadTimeout, config.BatcherConfig.FinalizerInterval, queue, client, rpcClient, config.BatcherConfig.MaxNumRetriesPerBlob, 1000, config.BatcherConfig.FinalizerPoolSize, logger, metrics.FinalizerMetrics)
txnManager := batcher.NewTxnManager(client, 20, config.TimeoutConfig.ChainWriteTimeout, logger, metrics.TxnManagerMetrics)
batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics)
if err != nil {
Expand Down
23 changes: 16 additions & 7 deletions disperser/common/inmem/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,27 @@ func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, s
metas := make([]*disperser.BlobMetadata, 0)
foundStart := exclusiveStartKey == nil

for _, meta := range q.Metadata {
keys := make([]disperser.BlobKey, len(q.Metadata))
i := 0
for k := range q.Metadata {
keys[i] = k
i++
}
sort.Slice(keys, func(i, j int) bool {
return q.Metadata[keys[i]].RequestMetadata.RequestedAt < q.Metadata[keys[j]].RequestMetadata.RequestedAt
})
for _, key := range keys {
meta := q.Metadata[key]
if meta.BlobStatus == status {
if foundStart {
metas = append(metas, meta)
if len(metas) == int(limit) {
break
return metas, &disperser.BlobStoreExclusiveStartKey{
BlobStatus: int32(meta.BlobStatus),
RequestedAt: int64(meta.RequestMetadata.RequestedAt),
}, nil
}
} else if meta.BlobStatus == disperser.BlobStatus(exclusiveStartKey.BlobStatus) && meta.RequestMetadata.RequestedAt == uint64(exclusiveStartKey.RequestedAt) {
} else if meta.BlobStatus == disperser.BlobStatus(exclusiveStartKey.BlobStatus) && meta.RequestMetadata.RequestedAt > uint64(exclusiveStartKey.RequestedAt) {
foundStart = true // Found the starting point, start appending metas from next item
metas = append(metas, meta)
if len(metas) == int(limit) {
Expand All @@ -179,10 +192,6 @@ func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, s
}
}

sort.SliceStable(metas, func(i, j int) bool {
return metas[i].RequestMetadata.RequestedAt < metas[j].RequestMetadata.RequestedAt
})

// Return all the metas if limit is not reached
return metas, nil, nil
}
Expand Down
Loading