From 0b5e1199f9eff55b90ddd96d28c2a8edfa15074a Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Tue, 13 Aug 2024 18:56:43 -0700 Subject: [PATCH] Enable minibatcher Refactor batcher/minibatcher creation to use generic interface Fix integration test Fix tests Update batcher interface type cast Move minibatcher tablename check to earlier init phase Make minibatcher table name optional Add info output at startup Lint Remove log output Rename MinibatchStore to Batchstore Remove generic batch/minibatcher interface Add orchestrator abstraction Add metrics Add finalizer Refactor and consolidate encoding client/streamer init Revert batcher to original Move minibatcher branch logic to main Move finalizer, encodingstreamer, chain state, assignment mgr into orchestrator Add batchConfirmer to orchestrator Move signalLiveness into orchestrator Add transactionMgr startup to orchestrator Lint Lint Lint Lint --- disperser/batcher/batch_confirmer.go | 10 +- disperser/batcher/batch_confirmer_test.go | 7 +- disperser/batcher/batcher.go | 10 + disperser/batcher/batcher_test.go | 2 - .../batcher/batchstore/minibatch_store.go | 2 +- .../batchstore/minibatch_store_test.go | 2 +- disperser/batcher/minibatcher.go | 96 ++------ disperser/batcher/minibatcher_test.go | 87 +++---- disperser/batcher/orchestrator.go | 216 ++++++++++++++++++ disperser/cmd/batcher/config.go | 37 +-- disperser/cmd/batcher/flags/flags.go | 23 ++ disperser/cmd/batcher/main.go | 22 +- 12 files changed, 358 insertions(+), 156 deletions(-) create mode 100644 disperser/batcher/orchestrator.go diff --git a/disperser/batcher/batch_confirmer.go b/disperser/batcher/batch_confirmer.go index be68e467f4..1ab2853362 100644 --- a/disperser/batcher/batch_confirmer.go +++ b/disperser/batcher/batch_confirmer.go @@ -25,7 +25,7 @@ type BatchConfirmerConfig struct { DispersalStatusCheckInterval time.Duration AttestationTimeout time.Duration SRSOrder int - NumConnections int + NumConnections uint MaxNumRetriesPerBlob uint } @@ -46,6 +46,7 @@ type BatchConfirmer struct { ethClient common.EthClient logger logging.Logger + Metrics *Metrics } func NewBatchConfirmer( @@ -62,6 +63,7 @@ func NewBatchConfirmer( txnManager TxnManager, minibatcher *Minibatcher, logger logging.Logger, + metrics *Metrics, ) (*BatchConfirmer, error) { return &BatchConfirmer{ BatchConfirmerConfig: config, @@ -76,6 +78,7 @@ func NewBatchConfirmer( Transactor: transactor, TransactionManager: txnManager, Minibatcher: minibatcher, + Metrics: metrics, ethClient: ethClient, logger: logger.With("component", "BatchConfirmer"), @@ -285,6 +288,11 @@ func (b *BatchConfirmer) handleFailure(ctx context.Context, batchID uuid.UUID, b continue } + if reason == FailNoSignatures { + b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.InsufficientSignatures) + } else { + b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.Failed) + } numPermanentFailures++ } diff --git a/disperser/batcher/batch_confirmer_test.go b/disperser/batcher/batch_confirmer_test.go index bf17a56887..b4a8509b82 100644 --- a/disperser/batcher/batch_confirmer_test.go +++ b/disperser/batcher/batch_confirmer_test.go @@ -15,7 +15,6 @@ import ( bat "github.com/Layr-Labs/eigenda/disperser/batcher" batcherinmem "github.com/Layr-Labs/eigenda/disperser/batcher/inmem" batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock" - batmock "github.com/Layr-Labs/eigenda/disperser/batcher/mock" "github.com/Layr-Labs/eigenda/disperser/common/inmem" dmock "github.com/Layr-Labs/eigenda/disperser/mock" "github.com/Layr-Labs/eigenda/encoding" @@ -57,7 +56,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents { dispatcher := dmock.NewDispatcher(state) blobStore := inmem.NewBlobStore() ethClient := &cmock.MockEthClient{} - txnManager := batmock.NewTxnManager() + txnManager := batchermock.NewTxnManager() minibatchStore := batcherinmem.NewMinibatchStore(logger) encodingWorkerPool := workerpool.New(10) encoderProver, err = makeTestProver() @@ -76,7 +75,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents { MaxNumConnections: 10, MaxNumRetriesPerBlob: 2, MaxNumRetriesPerDispersal: 1, - }, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, ethClient, pool, logger) + }, blobStore, minibatchStore, dispatcher, encodingStreamer, pool, logger, metrics) assert.NoError(t, err) config := bat.BatchConfirmerConfig{ @@ -88,7 +87,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents { SRSOrder: 3000, MaxNumRetriesPerBlob: 2, } - b, err := bat.NewBatchConfirmer(config, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, agg, ethClient, transactor, txnManager, minibatcher, logger) + b, err := bat.NewBatchConfirmer(config, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, agg, ethClient, transactor, txnManager, minibatcher, logger, metrics) assert.NoError(t, err) ethClient.On("BlockNumber").Return(uint64(initialBlock), nil) diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 343d781839..e70044c233 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -41,6 +41,7 @@ type QuorumInfo struct { type TimeoutConfig struct { EncodingTimeout time.Duration + DispersalTimeout time.Duration AttestationTimeout time.Duration ChainReadTimeout time.Duration ChainWriteTimeout time.Duration @@ -64,6 +65,11 @@ type Config struct { TargetNumChunks uint MaxBlobsToFetchFromStore int + + EnableMinibatch bool + BatchstoreTableName string + MinibatcherConfig MinibatcherConfig + DispersalStatusCheckInterval time.Duration } type Batcher struct { @@ -105,6 +111,10 @@ func NewBatcher( metrics *Metrics, heartbeatChan chan time.Time, ) (*Batcher, error) { + if config.EnableMinibatch { + return nil, errors.New("minibatch is not supported") + } + batchTrigger := NewEncodedSizeNotifier( make(chan struct{}, 1), uint64(config.BatchSizeMBLimit)*1024*1024, // convert to bytes diff --git a/disperser/batcher/batcher_test.go b/disperser/batcher/batcher_test.go index 0a2e8d91a7..1306b3773e 100644 --- a/disperser/batcher/batcher_test.go +++ b/disperser/batcher/batcher_test.go @@ -41,7 +41,6 @@ type batcherComponents struct { transactor *coremock.MockTransactor txnManager *batchermock.MockTxnManager blobStore disperser.BlobStore - encoderClient *disperser.LocalEncoderClient encodingStreamer *bat.EncodingStreamer ethClient *cmock.MockEthClient dispatcher *dmock.Dispatcher @@ -153,7 +152,6 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time. transactor: transactor, txnManager: txnManager, blobStore: blobStore, - encoderClient: encoderClient, encodingStreamer: b.EncodingStreamer, ethClient: ethClient, dispatcher: dispatcher, diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go index 97bb94bf37..3d42540fb3 100644 --- a/disperser/batcher/batchstore/minibatch_store.go +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -35,7 +35,7 @@ type MinibatchStore struct { var _ batcher.MinibatchStore = (*MinibatchStore)(nil) -func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *MinibatchStore { +func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) batcher.MinibatchStore { logger.Debugf("creating minibatch store with table %s with TTL: %s", tableName, ttl) return &MinibatchStore{ dynamoDBClient: dynamoDBClient, diff --git a/disperser/batcher/batchstore/minibatch_store_test.go b/disperser/batcher/batchstore/minibatch_store_test.go index f85b505a87..27583f0de7 100644 --- a/disperser/batcher/batchstore/minibatch_store_test.go +++ b/disperser/batcher/batchstore/minibatch_store_test.go @@ -35,7 +35,7 @@ var ( localStackPort = "4566" dynamoClient *dynamodb.Client - minibatchStore *batchstore.MinibatchStore + minibatchStore batcher.MinibatchStore UUID = uuid.New() minibatchTableName = fmt.Sprintf("test-MinibatchStore-%v", UUID) diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go index 16a7869247..c898a7167d 100644 --- a/disperser/batcher/minibatcher.go +++ b/disperser/batcher/minibatcher.go @@ -2,7 +2,6 @@ package batcher import ( "context" - "errors" "fmt" "time" @@ -32,13 +31,11 @@ type BatchState struct { type Minibatcher struct { MinibatcherConfig - BlobStore disperser.BlobStore - MinibatchStore MinibatchStore - Dispatcher disperser.Dispatcher - ChainState core.IndexedChainState - AssignmentCoordinator core.AssignmentCoordinator - EncodingStreamer *EncodingStreamer - Pool common.WorkerPool + BlobStore disperser.BlobStore + MinibatchStore MinibatchStore + Dispatcher disperser.Dispatcher + EncodingStreamer *EncodingStreamer + Pool common.WorkerPool // local state Batches map[uuid.UUID]*BatchState @@ -46,8 +43,8 @@ type Minibatcher struct { CurrentBatchID uuid.UUID MinibatchIndex uint - ethClient common.EthClient - logger logging.Logger + Metrics *Metrics + logger logging.Logger } func NewMinibatcher( @@ -55,69 +52,32 @@ func NewMinibatcher( blobStore disperser.BlobStore, minibatchStore MinibatchStore, dispatcher disperser.Dispatcher, - chainState core.IndexedChainState, - assignmentCoordinator core.AssignmentCoordinator, encodingStreamer *EncodingStreamer, - ethClient common.EthClient, workerpool common.WorkerPool, logger logging.Logger, + metrics *Metrics, ) (*Minibatcher, error) { return &Minibatcher{ - MinibatcherConfig: config, - BlobStore: blobStore, - MinibatchStore: minibatchStore, - Dispatcher: dispatcher, - ChainState: chainState, - AssignmentCoordinator: assignmentCoordinator, - EncodingStreamer: encodingStreamer, - Pool: workerpool, + MinibatcherConfig: config, + BlobStore: blobStore, + MinibatchStore: minibatchStore, + Dispatcher: dispatcher, + EncodingStreamer: encodingStreamer, + Pool: workerpool, Batches: make(map[uuid.UUID]*BatchState), ReferenceBlockNumber: 0, CurrentBatchID: uuid.Nil, MinibatchIndex: 0, - ethClient: ethClient, - logger: logger.With("component", "Minibatcher"), + logger: logger.With("component", "Minibatcher"), + Metrics: metrics, }, nil } -func (b *Minibatcher) Start(ctx context.Context) error { - err := b.ChainState.Start(ctx) - if err != nil { - return err - } - // Wait for few seconds for indexer to index blockchain - // This won't be needed when we switch to using Graph node - time.Sleep(indexerWarmupDelay) - go func() { - ticker := time.NewTicker(b.PullInterval) - defer ticker.Stop() - cancelFuncs := make([]context.CancelFunc, 0) - for { - select { - case <-ctx.Done(): - for _, cancel := range cancelFuncs { - cancel() - } - return - case <-ticker.C: - cancel, err := b.HandleSingleMinibatch(ctx) - if err != nil { - if errors.Is(err, errNoEncodedResults) { - b.logger.Warn("no encoded results to make a batch with") - } else { - b.logger.Error("failed to process a batch", "err", err) - } - } - if cancel != nil { - cancelFuncs = append(cancelFuncs, cancel) - } - } - } - }() - - return nil +func (b *Minibatcher) RecoverState(ctx context.Context) error { + //TODO: Implement minibatch recovery + return fmt.Errorf("minibatch state recovery not implemented") } func (b *Minibatcher) PopBatchState(batchID uuid.UUID) *BatchState { @@ -136,7 +96,7 @@ func (b *Minibatcher) handleFailure(ctx context.Context, blobMetadatas []*disper b.EncodingStreamer.RemoveEncodedBlob(metadata) retry, err := b.BlobStore.HandleBlobFailure(ctx, metadata, b.MaxNumRetriesPerBlob) if err != nil { - b.logger.Error("HandleSingleBatch: error handling blob failure", "err", err) + b.logger.Error("error handling blobstore blob failure", "err", err) // Append the error result = multierror.Append(result, err) } @@ -153,6 +113,7 @@ func (b *Minibatcher) handleFailure(ctx context.Context, blobMetadatas []*disper func (b *Minibatcher) HandleSingleMinibatch(ctx context.Context) (context.CancelFunc, error) { log := b.logger + // If too many dispersal requests are pending, skip an iteration if pending := b.Pool.WaitingQueueSize(); pending > int(b.MaxNumConnections) { return nil, fmt.Errorf("too many pending requests %d with max number of connections %d. skipping minibatch iteration", pending, b.MaxNumConnections) @@ -218,16 +179,7 @@ func (b *Minibatcher) HandleSingleMinibatch(ctx context.Context) (context.Cancel err := b.createBlobMinibatchMappings(ctx, b.CurrentBatchID, b.MinibatchIndex, minibatch.BlobMetadata, minibatch.BlobHeaders) storeMappingsChan <- err }() - - // Disperse the minibatch to operators in all quorums - // If an operator doesn't have any bundles, it won't receive any chunks but it will still receive blob headers - operatorsAllQuorums, err := b.ChainState.GetIndexedOperators(ctx, b.ReferenceBlockNumber) - if err != nil { - cancelDispersal() - _ = b.handleFailure(ctx, minibatch.BlobMetadata, FailReason("error getting operator state for all quorums")) - return nil, fmt.Errorf("error getting operator state for all quorums: %w", err) - } - b.DisperseBatch(dispersalCtx, operatorsAllQuorums, minibatch.EncodedBlobs, minibatch.BatchHeader, b.CurrentBatchID, b.MinibatchIndex) + b.DisperseBatch(dispersalCtx, minibatch.State, minibatch.EncodedBlobs, minibatch.BatchHeader, b.CurrentBatchID, b.MinibatchIndex) log.Debug("DisperseBatch took", "duration", time.Since(stageTimer).String()) h, err := minibatch.State.OperatorState.Hash() @@ -252,8 +204,8 @@ func (b *Minibatcher) HandleSingleMinibatch(ctx context.Context) (context.Cancel return cancelDispersal, nil } -func (b *Minibatcher) DisperseBatch(ctx context.Context, operators map[core.OperatorID]*core.IndexedOperatorInfo, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, batchID uuid.UUID, minibatchIndex uint) { - for id, op := range operators { +func (b *Minibatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, batchID uuid.UUID, minibatchIndex uint) { + for id, op := range state.IndexedOperators { opInfo := op opID := id req := &MinibatchDispersal{ diff --git a/disperser/batcher/minibatcher_test.go b/disperser/batcher/minibatcher_test.go index 73b099624e..0f347021cf 100644 --- a/disperser/batcher/minibatcher_test.go +++ b/disperser/batcher/minibatcher_test.go @@ -8,7 +8,6 @@ import ( "time" cmock "github.com/Layr-Labs/eigenda/common/mock" - commonmock "github.com/Layr-Labs/eigenda/common/mock" "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/disperser" @@ -27,7 +26,6 @@ import ( var ( opId0, _ = core.OperatorIDFromHex("e22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311") opId1, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568312") - opId2, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568313") mockChainState, _ = coremock.NewChainDataMock(map[uint8]map[core.OperatorID]int{ 0: { opId0: 1, @@ -36,9 +34,6 @@ var ( 1: { opId0: 1, }, - 2: { - opId2: 1, - }, }) defaultConfig = batcher.MinibatcherConfig{ PullInterval: 1 * time.Second, @@ -52,16 +47,15 @@ const ( ) type minibatcherComponents struct { - minibatcher *batcher.Minibatcher - blobStore disperser.BlobStore - minibatchStore batcher.MinibatchStore - dispatcher *dmock.Dispatcher - chainState *coremock.ChainDataMock - assignmentCoordinator core.AssignmentCoordinator - encodingStreamer *batcher.EncodingStreamer - pool *workerpool.WorkerPool - ethClient *commonmock.MockEthClient - logger logging.Logger + minibatcher *batcher.Minibatcher + blobStore disperser.BlobStore + minibatchStore batcher.MinibatchStore + dispatcher *dmock.Dispatcher + chainState *core.IndexedOperatorState + encodingStreamer *batcher.EncodingStreamer + pool *workerpool.WorkerPool + logger logging.Logger + metrics *batcher.Metrics } func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcherComponents { @@ -94,22 +88,18 @@ func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcher ) encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, chainState, encoderClient, asgn, trigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, metrics, logger) assert.NoError(t, err) - ethClient := &cmock.MockEthClient{} pool := workerpool.New(int(config.MaxNumConnections)) - m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, chainState, asgn, encodingStreamer, ethClient, pool, logger) + m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, encodingStreamer, pool, logger, metrics) assert.NoError(t, err) - return &minibatcherComponents{ - minibatcher: m, - blobStore: blobStore, - minibatchStore: minibatchStore, - dispatcher: dispatcher, - chainState: chainState, - assignmentCoordinator: asgn, - encodingStreamer: encodingStreamer, - pool: pool, - ethClient: ethClient, - logger: logger, + minibatcher: m, + blobStore: blobStore, + minibatchStore: minibatchStore, + dispatcher: dispatcher, + encodingStreamer: encodingStreamer, + pool: pool, + logger: logger, + metrics: metrics, } } @@ -171,12 +161,10 @@ func TestDisperseMinibatch(t *testing.T) { // Check the dispersal records dispersal, err := c.minibatchStore.GetDispersal(ctx, c.minibatcher.CurrentBatchID, 0, opId0) assert.NoError(t, err) - operatorState, err := c.chainState.GetIndexedOperatorState(context.Background(), 0, []core.QuorumID{0, 1}) - assert.NoError(t, err) assert.Equal(t, dispersal.BatchID, c.minibatcher.CurrentBatchID) assert.Equal(t, dispersal.MinibatchIndex, uint(0)) assert.Equal(t, dispersal.OperatorID, opId0) - assert.Equal(t, dispersal.Socket, operatorState.IndexedOperators[opId0].Socket) + assert.Equal(t, dispersal.Socket, c.chainState.IndexedOperators[opId0].Socket) assert.Equal(t, dispersal.NumBlobs, uint(2)) assert.NotNil(t, dispersal.RequestedAt) @@ -185,7 +173,7 @@ func TestDisperseMinibatch(t *testing.T) { assert.Equal(t, dispersal.BatchID, c.minibatcher.CurrentBatchID) assert.Equal(t, dispersal.MinibatchIndex, uint(0)) assert.Equal(t, dispersal.OperatorID, opId1) - assert.Equal(t, dispersal.Socket, operatorState.IndexedOperators[opId1].Socket) + assert.Equal(t, dispersal.Socket, c.chainState.IndexedOperators[opId1].Socket) assert.Equal(t, dispersal.NumBlobs, uint(2)) assert.NotNil(t, dispersal.RequestedAt) @@ -342,11 +330,11 @@ func TestDisperseMinibatch(t *testing.T) { assert.Len(t, c.minibatcher.Batches, 1) assert.Nil(t, c.minibatcher.Batches[b.ID]) - c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 9) + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 6) dispersals, err := c.minibatchStore.GetDispersalsByMinibatch(ctx, b.ID, 0) assert.NoError(t, err) - assert.Len(t, dispersals, 3) - opIDs := make([]core.OperatorID, 3) + assert.Len(t, dispersals, 2) + opIDs := make([]core.OperatorID, 2) for i, dispersal := range dispersals { assert.Equal(t, dispersal.BatchID, b.ID) assert.Equal(t, dispersal.MinibatchIndex, uint(0)) @@ -358,7 +346,7 @@ func TestDisperseMinibatch(t *testing.T) { assert.NoError(t, dispersal.Error) assert.Len(t, dispersal.Signatures, 1) } - assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1, opId2}) + assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1}) } func TestDisperseMinibatchFailure(t *testing.T) { @@ -416,11 +404,11 @@ func TestDisperseMinibatchFailure(t *testing.T) { assert.Equal(t, c.minibatcher.ReferenceBlockNumber, b.ReferenceBlockNumber) c.pool.StopWait() - c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 3) + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2) dispersals, err := c.minibatchStore.GetDispersalsByMinibatch(ctx, c.minibatcher.CurrentBatchID, 0) assert.NoError(t, err) - assert.Len(t, dispersals, 3) - opIDs := make([]core.OperatorID, 3) + assert.Len(t, dispersals, 2) + opIDs := make([]core.OperatorID, 2) for i, dispersal := range dispersals { assert.Equal(t, dispersal.BatchID, c.minibatcher.CurrentBatchID) assert.Equal(t, dispersal.MinibatchIndex, uint(0)) @@ -432,7 +420,7 @@ func TestDisperseMinibatchFailure(t *testing.T) { assert.NoError(t, dispersal.Error) assert.Len(t, dispersal.Signatures, 1) } - assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1, opId2}) + assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1}) } func TestSendBlobsToOperatorWithRetries(t *testing.T) { @@ -483,17 +471,10 @@ func TestSendBlobsToOperatorWithRetries(t *testing.T) { assert.NoError(t, err) assert.Len(t, signatures, 1) - c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fail")).Twice() - c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*core.Signature{sig}, nil).Once() - signatures, err = c.minibatcher.SendBlobsToOperatorWithRetries(ctx, batch.EncodedBlobs, batch.BatchHeader, batch.State.IndexedOperators[opId2], opId2, 3) - c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 6) - assert.NoError(t, err) - assert.Len(t, signatures, 1) - c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fail")).Times(3) c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*core.Signature{sig}, nil).Once() signatures, err = c.minibatcher.SendBlobsToOperatorWithRetries(ctx, batch.EncodedBlobs, batch.BatchHeader, batch.State.IndexedOperators[opId1], opId1, 3) - c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 9) + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 6) assert.ErrorContains(t, err, "failed to send chunks to operator") assert.Nil(t, signatures) } @@ -516,17 +497,15 @@ func TestSendBlobsToOperatorWithRetriesCanceled(t *testing.T) { assert.NoError(t, err) batch, err := c.encodingStreamer.CreateMinibatch(ctx) assert.NoError(t, err) - operators, err := c.chainState.GetIndexedOperators(ctx, initialBlock) - assert.NoError(t, err) minibatchIndex := uint(12) c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, context.Canceled) - c.minibatcher.DisperseBatch(ctx, operators, batch.EncodedBlobs, batch.BatchHeader, c.minibatcher.CurrentBatchID, minibatchIndex) + c.minibatcher.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, c.minibatcher.CurrentBatchID, minibatchIndex) c.pool.StopWait() dispersals, err := c.minibatchStore.GetDispersalsByMinibatch(ctx, c.minibatcher.CurrentBatchID, minibatchIndex) assert.NoError(t, err) - assert.Len(t, dispersals, 3) + assert.Len(t, dispersals, 2) - indexedState, err := mockChainState.GetIndexedOperatorState(ctx, initialBlock, []core.QuorumID{0, 1, 2}) + indexedState, err := mockChainState.GetIndexedOperatorState(ctx, initialBlock, []core.QuorumID{0}) assert.NoError(t, err) assert.Len(t, dispersals, len(indexedState.IndexedOperators)) for _, dispersal := range dispersals { @@ -540,7 +519,7 @@ func TestMinibatcherTooManyPendingRequests(t *testing.T) { ctx := context.Background() mockWorkerPool := &cmock.MockWorkerpool{} // minibatcher with mock worker pool - m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.minibatcher.ChainState, c.assignmentCoordinator, c.encodingStreamer, c.ethClient, mockWorkerPool, c.logger) + m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.encodingStreamer, mockWorkerPool, c.logger, c.metrics) assert.NoError(t, err) mockWorkerPool.On("WaitingQueueSize").Return(int(defaultConfig.MaxNumConnections + 1)).Once() _, err = m.HandleSingleMinibatch(ctx) diff --git a/disperser/batcher/orchestrator.go b/disperser/batcher/orchestrator.go new file mode 100644 index 0000000000..a24528494a --- /dev/null +++ b/disperser/batcher/orchestrator.go @@ -0,0 +1,216 @@ +package batcher + +import ( + "context" + "errors" + "time" + + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/gammazero/workerpool" +) + +type Orchestrator struct { + Config + TimeoutConfig + + Queue disperser.BlobStore + MinibatchStore MinibatchStore + Dispatcher disperser.Dispatcher + EncoderClient disperser.EncoderClient + + ChainState core.IndexedChainState + AssignmentCoordinator core.AssignmentCoordinator + Aggregator core.SignatureAggregator + EncodingStreamer *EncodingStreamer + Transactor core.Transactor + TransactionManager TxnManager + Metrics *Metrics + HeartbeatChan chan time.Time + + ethClient common.EthClient + finalizer Finalizer + logger logging.Logger + + MiniBatcher *Minibatcher + BatchConfirmer *BatchConfirmer +} + +func NewOrchestrator( + config Config, + timeoutConfig TimeoutConfig, + queue disperser.BlobStore, + minibatchStore MinibatchStore, + dispatcher disperser.Dispatcher, + chainState core.IndexedChainState, + assignmentCoordinator core.AssignmentCoordinator, + encoderClient disperser.EncoderClient, + aggregator core.SignatureAggregator, + ethClient common.EthClient, + finalizer Finalizer, + transactor core.Transactor, + txnManager TxnManager, + logger logging.Logger, + metrics *Metrics, + heartbeatChan chan time.Time, +) (*Orchestrator, error) { + batchTrigger := NewEncodedSizeNotifier( + make(chan struct{}, 1), + uint64(config.BatchSizeMBLimit)*1024*1024, // convert to bytes + ) + streamerConfig := StreamerConfig{ + SRSOrder: config.SRSOrder, + EncodingRequestTimeout: config.PullInterval, + EncodingQueueLimit: config.EncodingRequestQueueSize, + TargetNumChunks: config.TargetNumChunks, + MaxBlobsToFetchFromStore: config.MaxBlobsToFetchFromStore, + FinalizationBlockDelay: config.FinalizationBlockDelay, + ChainStateTimeout: timeoutConfig.ChainStateTimeout, + } + encodingWorkerPool := workerpool.New(config.NumConnections) + encodingStreamer, err := NewEncodingStreamer(streamerConfig, queue, chainState, encoderClient, assignmentCoordinator, batchTrigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, metrics, logger) + if err != nil { + return nil, err + } + + miniBatcher, err := NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, encodingStreamer, encodingWorkerPool, logger, metrics) + if err != nil { + return nil, err + } + + batchConfirmerConfig := BatchConfirmerConfig{ + PullInterval: config.PullInterval, + DispersalTimeout: timeoutConfig.DispersalTimeout, + DispersalStatusCheckInterval: config.DispersalStatusCheckInterval, + AttestationTimeout: timeoutConfig.AttestationTimeout, + SRSOrder: config.SRSOrder, + NumConnections: config.MinibatcherConfig.MaxNumConnections, + MaxNumRetriesPerBlob: config.MinibatcherConfig.MaxNumRetriesPerBlob, + } + batchConfirmer, err := NewBatchConfirmer(batchConfirmerConfig, queue, minibatchStore, dispatcher, chainState, assignmentCoordinator, encodingStreamer, aggregator, ethClient, transactor, txnManager, miniBatcher, logger, metrics) + if err != nil { + return nil, err + } + + return &Orchestrator{ + Config: config, + TimeoutConfig: timeoutConfig, + + Queue: queue, + MinibatchStore: minibatchStore, + Dispatcher: dispatcher, + EncoderClient: encoderClient, + + ChainState: chainState, + AssignmentCoordinator: assignmentCoordinator, + Aggregator: aggregator, + EncodingStreamer: encodingStreamer, + Transactor: transactor, + TransactionManager: txnManager, + Metrics: metrics, + + ethClient: ethClient, + finalizer: finalizer, + logger: logger.With("component", "Orchestrator"), + HeartbeatChan: heartbeatChan, + MiniBatcher: miniBatcher, + BatchConfirmer: batchConfirmer, + }, nil +} + +func (o *Orchestrator) Start(ctx context.Context) error { + err := o.ChainState.Start(ctx) + if err != nil { + return err + } + + // Wait for few seconds for indexer to index blockchain + // This won't be needed when we switch to using Graph node + time.Sleep(indexerWarmupDelay) + err = o.EncodingStreamer.Start(ctx) + if err != nil { + return err + } + batchTrigger := o.EncodingStreamer.EncodedSizeNotifier + + err = o.BatchConfirmer.Start(ctx) + if err != nil { + return err + } + + go func() { + receiptChan := o.TransactionManager.ReceiptChan() + for { + select { + case <-ctx.Done(): + return + case receiptOrErr := <-receiptChan: + o.logger.Info("received response from transaction manager", "receipt", receiptOrErr.Receipt, "err", receiptOrErr.Err) + err := o.BatchConfirmer.ProcessConfirmedBatch(ctx, receiptOrErr) + if err != nil { + o.logger.Error("failed to process confirmed batch", "err", err) + } + } + } + }() + o.TransactionManager.Start(ctx) + + o.finalizer.Start(ctx) + + go func() { + ticker := time.NewTicker(o.PullInterval) + defer ticker.Stop() + cancelFuncs := make([]context.CancelFunc, 0) + for { + select { + case <-ctx.Done(): + for _, cancel := range cancelFuncs { + cancel() + } + return + case <-ticker.C: + o.signalLiveness() + cancel, err := o.MiniBatcher.HandleSingleMinibatch(ctx) + if err != nil { + if errors.Is(err, errNoEncodedResults) { + o.logger.Warn("no encoded results to construct minibatch") + } else { + o.logger.Error("failed to process minibatch", "err", err) + } + } + if cancel != nil { + cancelFuncs = append(cancelFuncs, cancel) + } + case <-batchTrigger.Notify: + ticker.Stop() + o.signalLiveness() + cancel, err := o.MiniBatcher.HandleSingleMinibatch(ctx) + if err != nil { + if errors.Is(err, errNoEncodedResults) { + o.logger.Warn("no encoded results to construct minibatch") + } else { + o.logger.Error("failed to process minibatch", "err", err) + } + } + if cancel != nil { + cancelFuncs = append(cancelFuncs, cancel) + } + ticker.Reset(o.PullInterval) + } + } + }() + return nil +} + +func (o *Orchestrator) signalLiveness() { + select { + case o.HeartbeatChan <- time.Now(): + o.logger.Info("Heartbeat signal sent") + default: + // This case happens if there's no receiver ready to consume the heartbeat signal. + // It prevents the goroutine from blocking if the channel is full or not being listened to. + o.logger.Warn("Heartbeat signal skipped, no receiver on the channel") + } +} diff --git a/disperser/cmd/batcher/config.go b/disperser/cmd/batcher/config.go index 9ad499aa40..ad432bc4eb 100644 --- a/disperser/cmd/batcher/config.go +++ b/disperser/cmd/batcher/config.go @@ -32,8 +32,6 @@ type Config struct { BLSOperatorStateRetrieverAddr string EigenDAServiceManagerAddr string - EnableMinibatch bool - EnableGnarkBundleEncoding bool } @@ -57,21 +55,31 @@ func NewConfig(ctx *cli.Context) (Config, error) { EncoderConfig: kzg.ReadCLIConfig(ctx), LoggerConfig: *loggerConfig, BatcherConfig: batcher.Config{ - PullInterval: ctx.GlobalDuration(flags.PullIntervalFlag.Name), - FinalizerInterval: ctx.GlobalDuration(flags.FinalizerIntervalFlag.Name), - FinalizerPoolSize: ctx.GlobalInt(flags.FinalizerPoolSizeFlag.Name), - EncoderSocket: ctx.GlobalString(flags.EncoderSocket.Name), - NumConnections: ctx.GlobalInt(flags.NumConnectionsFlag.Name), - EncodingRequestQueueSize: ctx.GlobalInt(flags.EncodingRequestQueueSizeFlag.Name), - BatchSizeMBLimit: ctx.GlobalUint(flags.BatchSizeLimitFlag.Name), - SRSOrder: ctx.GlobalInt(flags.SRSOrderFlag.Name), - MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name), - TargetNumChunks: ctx.GlobalUint(flags.TargetNumChunksFlag.Name), - MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name), - FinalizationBlockDelay: ctx.GlobalUint(flags.FinalizationBlockDelayFlag.Name), + PullInterval: ctx.GlobalDuration(flags.PullIntervalFlag.Name), + FinalizerInterval: ctx.GlobalDuration(flags.FinalizerIntervalFlag.Name), + FinalizerPoolSize: ctx.GlobalInt(flags.FinalizerPoolSizeFlag.Name), + EncoderSocket: ctx.GlobalString(flags.EncoderSocket.Name), + NumConnections: ctx.GlobalInt(flags.NumConnectionsFlag.Name), + EncodingRequestQueueSize: ctx.GlobalInt(flags.EncodingRequestQueueSizeFlag.Name), + BatchSizeMBLimit: ctx.GlobalUint(flags.BatchSizeLimitFlag.Name), + SRSOrder: ctx.GlobalInt(flags.SRSOrderFlag.Name), + MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name), + TargetNumChunks: ctx.GlobalUint(flags.TargetNumChunksFlag.Name), + MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name), + FinalizationBlockDelay: ctx.GlobalUint(flags.FinalizationBlockDelayFlag.Name), + EnableMinibatch: ctx.Bool(flags.EnableMinibatchFlag.Name), + BatchstoreTableName: ctx.GlobalString(flags.BatchstoreTableNameFlag.Name), + DispersalStatusCheckInterval: ctx.GlobalDuration(flags.DispersalStatusCheckIntervalFlag.Name), + MinibatcherConfig: batcher.MinibatcherConfig{ + PullInterval: ctx.GlobalDuration(flags.MinibatcherPullIntervalFlag.Name), + MaxNumConnections: ctx.GlobalUint(flags.MaxNodeConnectionsFlag.Name), + MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name), + MaxNumRetriesPerDispersal: ctx.GlobalUint(flags.MaxNumRetriesPerDispersalFlag.Name), + }, }, TimeoutConfig: batcher.TimeoutConfig{ EncodingTimeout: ctx.GlobalDuration(flags.EncodingTimeoutFlag.Name), + DispersalTimeout: ctx.GlobalDuration(flags.DispersalTimeoutFlag.Name), AttestationTimeout: ctx.GlobalDuration(flags.AttestationTimeoutFlag.Name), ChainReadTimeout: ctx.GlobalDuration(flags.ChainReadTimeoutFlag.Name), ChainWriteTimeout: ctx.GlobalDuration(flags.ChainWriteTimeoutFlag.Name), @@ -89,7 +97,6 @@ func NewConfig(ctx *cli.Context) (Config, error) { IndexerDataDir: ctx.GlobalString(flags.IndexerDataDirFlag.Name), IndexerConfig: indexer.ReadIndexerConfig(ctx), KMSKeyConfig: kmsConfig, - EnableMinibatch: ctx.Bool(flags.EnableMinibatchFlag.Name), EnableGnarkBundleEncoding: ctx.Bool(flags.EnableGnarkBundleEncodingFlag.Name), } return config, nil diff --git a/disperser/cmd/batcher/flags/flags.go b/disperser/cmd/batcher/flags/flags.go index 91c5f5ea6f..3bea077367 100644 --- a/disperser/cmd/batcher/flags/flags.go +++ b/disperser/cmd/batcher/flags/flags.go @@ -228,6 +228,26 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_NUM_RETRIES_PER_DISPERSAL"), Value: 3, } + BatchstoreTableNameFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "batchstore-table-name"), + Usage: "Name of the dynamodb table to store minibatch datamodel", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "BATCHSTORE_TABLE_NAME"), + } + DispersalStatusCheckIntervalFlag = cli.DurationFlag{ + Name: "dispersal-status-check-interval", + Usage: "Dispersal status check interval", + Required: false, + Value: 5 * time.Second, + EnvVar: common.PrefixEnvVar(envVarPrefix, "DISPERSAL_STATUS_CHECK_INTERVAL"), + } + DispersalTimeoutFlag = cli.DurationFlag{ + Name: "dispersal-timeout", + Usage: "dispersal connection timeout from grpc call to DA nodes for dispersal", + Required: false, + Value: 60 * time.Second, + EnvVar: common.PrefixEnvVar(envVarPrefix, "DISPERSAL_TIMEOUT"), + } ) var requiredFlags = []cli.Flag{ @@ -265,6 +285,9 @@ var optionalFlags = []cli.Flag{ MaxNodeConnectionsFlag, MaxNumRetriesPerDispersalFlag, EnableGnarkBundleEncodingFlag, + BatchstoreTableNameFlag, + DispersalStatusCheckIntervalFlag, + DispersalTimeoutFlag, } // Flags contains the list of configuration options available to the binary. diff --git a/disperser/cmd/batcher/main.go b/disperser/cmd/batcher/main.go index bdbb43aa52..42ab35f4c4 100644 --- a/disperser/cmd/batcher/main.go +++ b/disperser/cmd/batcher/main.go @@ -18,6 +18,7 @@ import ( "github.com/Layr-Labs/eigenda/core" coreeth "github.com/Layr-Labs/eigenda/core/eth" "github.com/Layr-Labs/eigenda/disperser/batcher" + "github.com/Layr-Labs/eigenda/disperser/batcher/batchstore" dispatcher "github.com/Layr-Labs/eigenda/disperser/batcher/grpc" "github.com/Layr-Labs/eigenda/disperser/cmd/batcher/flags" "github.com/Layr-Labs/eigenda/disperser/common/blobstore" @@ -84,6 +85,10 @@ func RunBatcher(ctx *cli.Context) error { return err } + if config.BatcherConfig.EnableMinibatch && config.BatcherConfig.BatchstoreTableName == "" { + return errors.New("minibatch store table name required when minibatch is enabled") + } + bucketName := config.BlobstoreConfig.BucketName s3Client, err := s3.NewClient(context.Background(), config.AwsClientConfig, logger) if err != nil { @@ -129,9 +134,7 @@ func RunBatcher(ctx *cli.Context) error { return fmt.Errorf("failed to get chain ID: %w", err) } signer := signerv2.NewKMSSigner(context.Background(), kmsClient, pubKey, config.KMSKeyConfig.KeyID, chainID) - if err != nil { - return err - } + wallet, err = walletsdk.NewPrivateKeyWallet(client, signer, addr, logger) if err != nil { return err @@ -239,9 +242,16 @@ func RunBatcher(ctx *cli.Context) error { logger.Info("Enabled metrics for Batcher", "socket", httpSocket) } - if config.EnableMinibatch { - // TODO: implement and run batchConfirmer for minibatch - return errors.New("minibatch is not supported") + if config.BatcherConfig.EnableMinibatch { + minibatchStore := batchstore.NewMinibatchStore(dynamoClient, logger, config.BatcherConfig.BatchstoreTableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) + orchestrator, err := batcher.NewOrchestrator(config.BatcherConfig, config.TimeoutConfig, queue, minibatchStore, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan) + if err != nil { + return err + } + err = orchestrator.Start(context.Background()) + if err != nil { + return err + } } else { batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan) if err != nil {