From 0b5e1199f9eff55b90ddd96d28c2a8edfa15074a Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Tue, 13 Aug 2024 18:56:43 -0700 Subject: [PATCH 01/10] Enable minibatcher Refactor batcher/minibatcher creation to use generic interface Fix integration test Fix tests Update batcher interface type cast Move minibatcher tablename check to earlier init phase Make minibatcher table name optional Add info output at startup Lint Remove log output Rename MinibatchStore to Batchstore Remove generic batch/minibatcher interface Add orchestrator abstraction Add metrics Add finalizer Refactor and consolidate encoding client/streamer init Revert batcher to original Move minibatcher branch logic to main Move finalizer, encodingstreamer, chain state, assignment mgr into orchestrator Add batchConfirmer to orchestrator Move signalLiveness into orchestrator Add transactionMgr startup to orchestrator Lint Lint Lint Lint --- disperser/batcher/batch_confirmer.go | 10 +- disperser/batcher/batch_confirmer_test.go | 7 +- disperser/batcher/batcher.go | 10 + disperser/batcher/batcher_test.go | 2 - .../batcher/batchstore/minibatch_store.go | 2 +- .../batchstore/minibatch_store_test.go | 2 +- disperser/batcher/minibatcher.go | 96 ++------ disperser/batcher/minibatcher_test.go | 87 +++---- disperser/batcher/orchestrator.go | 216 ++++++++++++++++++ disperser/cmd/batcher/config.go | 37 +-- disperser/cmd/batcher/flags/flags.go | 23 ++ disperser/cmd/batcher/main.go | 22 +- 12 files changed, 358 insertions(+), 156 deletions(-) create mode 100644 disperser/batcher/orchestrator.go diff --git a/disperser/batcher/batch_confirmer.go b/disperser/batcher/batch_confirmer.go index be68e467f4..1ab2853362 100644 --- a/disperser/batcher/batch_confirmer.go +++ b/disperser/batcher/batch_confirmer.go @@ -25,7 +25,7 @@ type BatchConfirmerConfig struct { DispersalStatusCheckInterval time.Duration AttestationTimeout time.Duration SRSOrder int - NumConnections int + NumConnections uint MaxNumRetriesPerBlob uint } @@ -46,6 +46,7 @@ type BatchConfirmer struct { ethClient common.EthClient logger logging.Logger + Metrics *Metrics } func NewBatchConfirmer( @@ -62,6 +63,7 @@ func NewBatchConfirmer( txnManager TxnManager, minibatcher *Minibatcher, logger logging.Logger, + metrics *Metrics, ) (*BatchConfirmer, error) { return &BatchConfirmer{ BatchConfirmerConfig: config, @@ -76,6 +78,7 @@ func NewBatchConfirmer( Transactor: transactor, TransactionManager: txnManager, Minibatcher: minibatcher, + Metrics: metrics, ethClient: ethClient, logger: logger.With("component", "BatchConfirmer"), @@ -285,6 +288,11 @@ func (b *BatchConfirmer) handleFailure(ctx context.Context, batchID uuid.UUID, b continue } + if reason == FailNoSignatures { + b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.InsufficientSignatures) + } else { + b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.Failed) + } numPermanentFailures++ } diff --git a/disperser/batcher/batch_confirmer_test.go b/disperser/batcher/batch_confirmer_test.go index bf17a56887..b4a8509b82 100644 --- a/disperser/batcher/batch_confirmer_test.go +++ b/disperser/batcher/batch_confirmer_test.go @@ -15,7 +15,6 @@ import ( bat "github.com/Layr-Labs/eigenda/disperser/batcher" batcherinmem "github.com/Layr-Labs/eigenda/disperser/batcher/inmem" batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock" - batmock "github.com/Layr-Labs/eigenda/disperser/batcher/mock" "github.com/Layr-Labs/eigenda/disperser/common/inmem" dmock "github.com/Layr-Labs/eigenda/disperser/mock" "github.com/Layr-Labs/eigenda/encoding" @@ -57,7 +56,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents { dispatcher := dmock.NewDispatcher(state) blobStore := inmem.NewBlobStore() ethClient := &cmock.MockEthClient{} - txnManager := batmock.NewTxnManager() + txnManager := batchermock.NewTxnManager() minibatchStore := batcherinmem.NewMinibatchStore(logger) encodingWorkerPool := workerpool.New(10) encoderProver, err = makeTestProver() @@ -76,7 +75,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents { MaxNumConnections: 10, MaxNumRetriesPerBlob: 2, MaxNumRetriesPerDispersal: 1, - }, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, ethClient, pool, logger) + }, blobStore, minibatchStore, dispatcher, encodingStreamer, pool, logger, metrics) assert.NoError(t, err) config := bat.BatchConfirmerConfig{ @@ -88,7 +87,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents { SRSOrder: 3000, MaxNumRetriesPerBlob: 2, } - b, err := bat.NewBatchConfirmer(config, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, agg, ethClient, transactor, txnManager, minibatcher, logger) + b, err := bat.NewBatchConfirmer(config, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, agg, ethClient, transactor, txnManager, minibatcher, logger, metrics) assert.NoError(t, err) ethClient.On("BlockNumber").Return(uint64(initialBlock), nil) diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 343d781839..e70044c233 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -41,6 +41,7 @@ type QuorumInfo struct { type TimeoutConfig struct { EncodingTimeout time.Duration + DispersalTimeout time.Duration AttestationTimeout time.Duration ChainReadTimeout time.Duration ChainWriteTimeout time.Duration @@ -64,6 +65,11 @@ type Config struct { TargetNumChunks uint MaxBlobsToFetchFromStore int + + EnableMinibatch bool + BatchstoreTableName string + MinibatcherConfig MinibatcherConfig + DispersalStatusCheckInterval time.Duration } type Batcher struct { @@ -105,6 +111,10 @@ func NewBatcher( metrics *Metrics, heartbeatChan chan time.Time, ) (*Batcher, error) { + if config.EnableMinibatch { + return nil, errors.New("minibatch is not supported") + } + batchTrigger := NewEncodedSizeNotifier( make(chan struct{}, 1), uint64(config.BatchSizeMBLimit)*1024*1024, // convert to bytes diff --git a/disperser/batcher/batcher_test.go b/disperser/batcher/batcher_test.go index 0a2e8d91a7..1306b3773e 100644 --- a/disperser/batcher/batcher_test.go +++ b/disperser/batcher/batcher_test.go @@ -41,7 +41,6 @@ type batcherComponents struct { transactor *coremock.MockTransactor txnManager *batchermock.MockTxnManager blobStore disperser.BlobStore - encoderClient *disperser.LocalEncoderClient encodingStreamer *bat.EncodingStreamer ethClient *cmock.MockEthClient dispatcher *dmock.Dispatcher @@ -153,7 +152,6 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time. transactor: transactor, txnManager: txnManager, blobStore: blobStore, - encoderClient: encoderClient, encodingStreamer: b.EncodingStreamer, ethClient: ethClient, dispatcher: dispatcher, diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go index 97bb94bf37..3d42540fb3 100644 --- a/disperser/batcher/batchstore/minibatch_store.go +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -35,7 +35,7 @@ type MinibatchStore struct { var _ batcher.MinibatchStore = (*MinibatchStore)(nil) -func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *MinibatchStore { +func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) batcher.MinibatchStore { logger.Debugf("creating minibatch store with table %s with TTL: %s", tableName, ttl) return &MinibatchStore{ dynamoDBClient: dynamoDBClient, diff --git a/disperser/batcher/batchstore/minibatch_store_test.go b/disperser/batcher/batchstore/minibatch_store_test.go index f85b505a87..27583f0de7 100644 --- a/disperser/batcher/batchstore/minibatch_store_test.go +++ b/disperser/batcher/batchstore/minibatch_store_test.go @@ -35,7 +35,7 @@ var ( localStackPort = "4566" dynamoClient *dynamodb.Client - minibatchStore *batchstore.MinibatchStore + minibatchStore batcher.MinibatchStore UUID = uuid.New() minibatchTableName = fmt.Sprintf("test-MinibatchStore-%v", UUID) diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go index 16a7869247..c898a7167d 100644 --- a/disperser/batcher/minibatcher.go +++ b/disperser/batcher/minibatcher.go @@ -2,7 +2,6 @@ package batcher import ( "context" - "errors" "fmt" "time" @@ -32,13 +31,11 @@ type BatchState struct { type Minibatcher struct { MinibatcherConfig - BlobStore disperser.BlobStore - MinibatchStore MinibatchStore - Dispatcher disperser.Dispatcher - ChainState core.IndexedChainState - AssignmentCoordinator core.AssignmentCoordinator - EncodingStreamer *EncodingStreamer - Pool common.WorkerPool + BlobStore disperser.BlobStore + MinibatchStore MinibatchStore + Dispatcher disperser.Dispatcher + EncodingStreamer *EncodingStreamer + Pool common.WorkerPool // local state Batches map[uuid.UUID]*BatchState @@ -46,8 +43,8 @@ type Minibatcher struct { CurrentBatchID uuid.UUID MinibatchIndex uint - ethClient common.EthClient - logger logging.Logger + Metrics *Metrics + logger logging.Logger } func NewMinibatcher( @@ -55,69 +52,32 @@ func NewMinibatcher( blobStore disperser.BlobStore, minibatchStore MinibatchStore, dispatcher disperser.Dispatcher, - chainState core.IndexedChainState, - assignmentCoordinator core.AssignmentCoordinator, encodingStreamer *EncodingStreamer, - ethClient common.EthClient, workerpool common.WorkerPool, logger logging.Logger, + metrics *Metrics, ) (*Minibatcher, error) { return &Minibatcher{ - MinibatcherConfig: config, - BlobStore: blobStore, - MinibatchStore: minibatchStore, - Dispatcher: dispatcher, - ChainState: chainState, - AssignmentCoordinator: assignmentCoordinator, - EncodingStreamer: encodingStreamer, - Pool: workerpool, + MinibatcherConfig: config, + BlobStore: blobStore, + MinibatchStore: minibatchStore, + Dispatcher: dispatcher, + EncodingStreamer: encodingStreamer, + Pool: workerpool, Batches: make(map[uuid.UUID]*BatchState), ReferenceBlockNumber: 0, CurrentBatchID: uuid.Nil, MinibatchIndex: 0, - ethClient: ethClient, - logger: logger.With("component", "Minibatcher"), + logger: logger.With("component", "Minibatcher"), + Metrics: metrics, }, nil } -func (b *Minibatcher) Start(ctx context.Context) error { - err := b.ChainState.Start(ctx) - if err != nil { - return err - } - // Wait for few seconds for indexer to index blockchain - // This won't be needed when we switch to using Graph node - time.Sleep(indexerWarmupDelay) - go func() { - ticker := time.NewTicker(b.PullInterval) - defer ticker.Stop() - cancelFuncs := make([]context.CancelFunc, 0) - for { - select { - case <-ctx.Done(): - for _, cancel := range cancelFuncs { - cancel() - } - return - case <-ticker.C: - cancel, err := b.HandleSingleMinibatch(ctx) - if err != nil { - if errors.Is(err, errNoEncodedResults) { - b.logger.Warn("no encoded results to make a batch with") - } else { - b.logger.Error("failed to process a batch", "err", err) - } - } - if cancel != nil { - cancelFuncs = append(cancelFuncs, cancel) - } - } - } - }() - - return nil +func (b *Minibatcher) RecoverState(ctx context.Context) error { + //TODO: Implement minibatch recovery + return fmt.Errorf("minibatch state recovery not implemented") } func (b *Minibatcher) PopBatchState(batchID uuid.UUID) *BatchState { @@ -136,7 +96,7 @@ func (b *Minibatcher) handleFailure(ctx context.Context, blobMetadatas []*disper b.EncodingStreamer.RemoveEncodedBlob(metadata) retry, err := b.BlobStore.HandleBlobFailure(ctx, metadata, b.MaxNumRetriesPerBlob) if err != nil { - b.logger.Error("HandleSingleBatch: error handling blob failure", "err", err) + b.logger.Error("error handling blobstore blob failure", "err", err) // Append the error result = multierror.Append(result, err) } @@ -153,6 +113,7 @@ func (b *Minibatcher) handleFailure(ctx context.Context, blobMetadatas []*disper func (b *Minibatcher) HandleSingleMinibatch(ctx context.Context) (context.CancelFunc, error) { log := b.logger + // If too many dispersal requests are pending, skip an iteration if pending := b.Pool.WaitingQueueSize(); pending > int(b.MaxNumConnections) { return nil, fmt.Errorf("too many pending requests %d with max number of connections %d. skipping minibatch iteration", pending, b.MaxNumConnections) @@ -218,16 +179,7 @@ func (b *Minibatcher) HandleSingleMinibatch(ctx context.Context) (context.Cancel err := b.createBlobMinibatchMappings(ctx, b.CurrentBatchID, b.MinibatchIndex, minibatch.BlobMetadata, minibatch.BlobHeaders) storeMappingsChan <- err }() - - // Disperse the minibatch to operators in all quorums - // If an operator doesn't have any bundles, it won't receive any chunks but it will still receive blob headers - operatorsAllQuorums, err := b.ChainState.GetIndexedOperators(ctx, b.ReferenceBlockNumber) - if err != nil { - cancelDispersal() - _ = b.handleFailure(ctx, minibatch.BlobMetadata, FailReason("error getting operator state for all quorums")) - return nil, fmt.Errorf("error getting operator state for all quorums: %w", err) - } - b.DisperseBatch(dispersalCtx, operatorsAllQuorums, minibatch.EncodedBlobs, minibatch.BatchHeader, b.CurrentBatchID, b.MinibatchIndex) + b.DisperseBatch(dispersalCtx, minibatch.State, minibatch.EncodedBlobs, minibatch.BatchHeader, b.CurrentBatchID, b.MinibatchIndex) log.Debug("DisperseBatch took", "duration", time.Since(stageTimer).String()) h, err := minibatch.State.OperatorState.Hash() @@ -252,8 +204,8 @@ func (b *Minibatcher) HandleSingleMinibatch(ctx context.Context) (context.Cancel return cancelDispersal, nil } -func (b *Minibatcher) DisperseBatch(ctx context.Context, operators map[core.OperatorID]*core.IndexedOperatorInfo, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, batchID uuid.UUID, minibatchIndex uint) { - for id, op := range operators { +func (b *Minibatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, batchID uuid.UUID, minibatchIndex uint) { + for id, op := range state.IndexedOperators { opInfo := op opID := id req := &MinibatchDispersal{ diff --git a/disperser/batcher/minibatcher_test.go b/disperser/batcher/minibatcher_test.go index 73b099624e..0f347021cf 100644 --- a/disperser/batcher/minibatcher_test.go +++ b/disperser/batcher/minibatcher_test.go @@ -8,7 +8,6 @@ import ( "time" cmock "github.com/Layr-Labs/eigenda/common/mock" - commonmock "github.com/Layr-Labs/eigenda/common/mock" "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/disperser" @@ -27,7 +26,6 @@ import ( var ( opId0, _ = core.OperatorIDFromHex("e22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311") opId1, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568312") - opId2, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568313") mockChainState, _ = coremock.NewChainDataMock(map[uint8]map[core.OperatorID]int{ 0: { opId0: 1, @@ -36,9 +34,6 @@ var ( 1: { opId0: 1, }, - 2: { - opId2: 1, - }, }) defaultConfig = batcher.MinibatcherConfig{ PullInterval: 1 * time.Second, @@ -52,16 +47,15 @@ const ( ) type minibatcherComponents struct { - minibatcher *batcher.Minibatcher - blobStore disperser.BlobStore - minibatchStore batcher.MinibatchStore - dispatcher *dmock.Dispatcher - chainState *coremock.ChainDataMock - assignmentCoordinator core.AssignmentCoordinator - encodingStreamer *batcher.EncodingStreamer - pool *workerpool.WorkerPool - ethClient *commonmock.MockEthClient - logger logging.Logger + minibatcher *batcher.Minibatcher + blobStore disperser.BlobStore + minibatchStore batcher.MinibatchStore + dispatcher *dmock.Dispatcher + chainState *core.IndexedOperatorState + encodingStreamer *batcher.EncodingStreamer + pool *workerpool.WorkerPool + logger logging.Logger + metrics *batcher.Metrics } func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcherComponents { @@ -94,22 +88,18 @@ func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcher ) encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, chainState, encoderClient, asgn, trigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, metrics, logger) assert.NoError(t, err) - ethClient := &cmock.MockEthClient{} pool := workerpool.New(int(config.MaxNumConnections)) - m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, chainState, asgn, encodingStreamer, ethClient, pool, logger) + m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, encodingStreamer, pool, logger, metrics) assert.NoError(t, err) - return &minibatcherComponents{ - minibatcher: m, - blobStore: blobStore, - minibatchStore: minibatchStore, - dispatcher: dispatcher, - chainState: chainState, - assignmentCoordinator: asgn, - encodingStreamer: encodingStreamer, - pool: pool, - ethClient: ethClient, - logger: logger, + minibatcher: m, + blobStore: blobStore, + minibatchStore: minibatchStore, + dispatcher: dispatcher, + encodingStreamer: encodingStreamer, + pool: pool, + logger: logger, + metrics: metrics, } } @@ -171,12 +161,10 @@ func TestDisperseMinibatch(t *testing.T) { // Check the dispersal records dispersal, err := c.minibatchStore.GetDispersal(ctx, c.minibatcher.CurrentBatchID, 0, opId0) assert.NoError(t, err) - operatorState, err := c.chainState.GetIndexedOperatorState(context.Background(), 0, []core.QuorumID{0, 1}) - assert.NoError(t, err) assert.Equal(t, dispersal.BatchID, c.minibatcher.CurrentBatchID) assert.Equal(t, dispersal.MinibatchIndex, uint(0)) assert.Equal(t, dispersal.OperatorID, opId0) - assert.Equal(t, dispersal.Socket, operatorState.IndexedOperators[opId0].Socket) + assert.Equal(t, dispersal.Socket, c.chainState.IndexedOperators[opId0].Socket) assert.Equal(t, dispersal.NumBlobs, uint(2)) assert.NotNil(t, dispersal.RequestedAt) @@ -185,7 +173,7 @@ func TestDisperseMinibatch(t *testing.T) { assert.Equal(t, dispersal.BatchID, c.minibatcher.CurrentBatchID) assert.Equal(t, dispersal.MinibatchIndex, uint(0)) assert.Equal(t, dispersal.OperatorID, opId1) - assert.Equal(t, dispersal.Socket, operatorState.IndexedOperators[opId1].Socket) + assert.Equal(t, dispersal.Socket, c.chainState.IndexedOperators[opId1].Socket) assert.Equal(t, dispersal.NumBlobs, uint(2)) assert.NotNil(t, dispersal.RequestedAt) @@ -342,11 +330,11 @@ func TestDisperseMinibatch(t *testing.T) { assert.Len(t, c.minibatcher.Batches, 1) assert.Nil(t, c.minibatcher.Batches[b.ID]) - c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 9) + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 6) dispersals, err := c.minibatchStore.GetDispersalsByMinibatch(ctx, b.ID, 0) assert.NoError(t, err) - assert.Len(t, dispersals, 3) - opIDs := make([]core.OperatorID, 3) + assert.Len(t, dispersals, 2) + opIDs := make([]core.OperatorID, 2) for i, dispersal := range dispersals { assert.Equal(t, dispersal.BatchID, b.ID) assert.Equal(t, dispersal.MinibatchIndex, uint(0)) @@ -358,7 +346,7 @@ func TestDisperseMinibatch(t *testing.T) { assert.NoError(t, dispersal.Error) assert.Len(t, dispersal.Signatures, 1) } - assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1, opId2}) + assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1}) } func TestDisperseMinibatchFailure(t *testing.T) { @@ -416,11 +404,11 @@ func TestDisperseMinibatchFailure(t *testing.T) { assert.Equal(t, c.minibatcher.ReferenceBlockNumber, b.ReferenceBlockNumber) c.pool.StopWait() - c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 3) + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2) dispersals, err := c.minibatchStore.GetDispersalsByMinibatch(ctx, c.minibatcher.CurrentBatchID, 0) assert.NoError(t, err) - assert.Len(t, dispersals, 3) - opIDs := make([]core.OperatorID, 3) + assert.Len(t, dispersals, 2) + opIDs := make([]core.OperatorID, 2) for i, dispersal := range dispersals { assert.Equal(t, dispersal.BatchID, c.minibatcher.CurrentBatchID) assert.Equal(t, dispersal.MinibatchIndex, uint(0)) @@ -432,7 +420,7 @@ func TestDisperseMinibatchFailure(t *testing.T) { assert.NoError(t, dispersal.Error) assert.Len(t, dispersal.Signatures, 1) } - assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1, opId2}) + assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1}) } func TestSendBlobsToOperatorWithRetries(t *testing.T) { @@ -483,17 +471,10 @@ func TestSendBlobsToOperatorWithRetries(t *testing.T) { assert.NoError(t, err) assert.Len(t, signatures, 1) - c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fail")).Twice() - c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*core.Signature{sig}, nil).Once() - signatures, err = c.minibatcher.SendBlobsToOperatorWithRetries(ctx, batch.EncodedBlobs, batch.BatchHeader, batch.State.IndexedOperators[opId2], opId2, 3) - c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 6) - assert.NoError(t, err) - assert.Len(t, signatures, 1) - c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fail")).Times(3) c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*core.Signature{sig}, nil).Once() signatures, err = c.minibatcher.SendBlobsToOperatorWithRetries(ctx, batch.EncodedBlobs, batch.BatchHeader, batch.State.IndexedOperators[opId1], opId1, 3) - c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 9) + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 6) assert.ErrorContains(t, err, "failed to send chunks to operator") assert.Nil(t, signatures) } @@ -516,17 +497,15 @@ func TestSendBlobsToOperatorWithRetriesCanceled(t *testing.T) { assert.NoError(t, err) batch, err := c.encodingStreamer.CreateMinibatch(ctx) assert.NoError(t, err) - operators, err := c.chainState.GetIndexedOperators(ctx, initialBlock) - assert.NoError(t, err) minibatchIndex := uint(12) c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, context.Canceled) - c.minibatcher.DisperseBatch(ctx, operators, batch.EncodedBlobs, batch.BatchHeader, c.minibatcher.CurrentBatchID, minibatchIndex) + c.minibatcher.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, c.minibatcher.CurrentBatchID, minibatchIndex) c.pool.StopWait() dispersals, err := c.minibatchStore.GetDispersalsByMinibatch(ctx, c.minibatcher.CurrentBatchID, minibatchIndex) assert.NoError(t, err) - assert.Len(t, dispersals, 3) + assert.Len(t, dispersals, 2) - indexedState, err := mockChainState.GetIndexedOperatorState(ctx, initialBlock, []core.QuorumID{0, 1, 2}) + indexedState, err := mockChainState.GetIndexedOperatorState(ctx, initialBlock, []core.QuorumID{0}) assert.NoError(t, err) assert.Len(t, dispersals, len(indexedState.IndexedOperators)) for _, dispersal := range dispersals { @@ -540,7 +519,7 @@ func TestMinibatcherTooManyPendingRequests(t *testing.T) { ctx := context.Background() mockWorkerPool := &cmock.MockWorkerpool{} // minibatcher with mock worker pool - m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.minibatcher.ChainState, c.assignmentCoordinator, c.encodingStreamer, c.ethClient, mockWorkerPool, c.logger) + m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.encodingStreamer, mockWorkerPool, c.logger, c.metrics) assert.NoError(t, err) mockWorkerPool.On("WaitingQueueSize").Return(int(defaultConfig.MaxNumConnections + 1)).Once() _, err = m.HandleSingleMinibatch(ctx) diff --git a/disperser/batcher/orchestrator.go b/disperser/batcher/orchestrator.go new file mode 100644 index 0000000000..a24528494a --- /dev/null +++ b/disperser/batcher/orchestrator.go @@ -0,0 +1,216 @@ +package batcher + +import ( + "context" + "errors" + "time" + + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/gammazero/workerpool" +) + +type Orchestrator struct { + Config + TimeoutConfig + + Queue disperser.BlobStore + MinibatchStore MinibatchStore + Dispatcher disperser.Dispatcher + EncoderClient disperser.EncoderClient + + ChainState core.IndexedChainState + AssignmentCoordinator core.AssignmentCoordinator + Aggregator core.SignatureAggregator + EncodingStreamer *EncodingStreamer + Transactor core.Transactor + TransactionManager TxnManager + Metrics *Metrics + HeartbeatChan chan time.Time + + ethClient common.EthClient + finalizer Finalizer + logger logging.Logger + + MiniBatcher *Minibatcher + BatchConfirmer *BatchConfirmer +} + +func NewOrchestrator( + config Config, + timeoutConfig TimeoutConfig, + queue disperser.BlobStore, + minibatchStore MinibatchStore, + dispatcher disperser.Dispatcher, + chainState core.IndexedChainState, + assignmentCoordinator core.AssignmentCoordinator, + encoderClient disperser.EncoderClient, + aggregator core.SignatureAggregator, + ethClient common.EthClient, + finalizer Finalizer, + transactor core.Transactor, + txnManager TxnManager, + logger logging.Logger, + metrics *Metrics, + heartbeatChan chan time.Time, +) (*Orchestrator, error) { + batchTrigger := NewEncodedSizeNotifier( + make(chan struct{}, 1), + uint64(config.BatchSizeMBLimit)*1024*1024, // convert to bytes + ) + streamerConfig := StreamerConfig{ + SRSOrder: config.SRSOrder, + EncodingRequestTimeout: config.PullInterval, + EncodingQueueLimit: config.EncodingRequestQueueSize, + TargetNumChunks: config.TargetNumChunks, + MaxBlobsToFetchFromStore: config.MaxBlobsToFetchFromStore, + FinalizationBlockDelay: config.FinalizationBlockDelay, + ChainStateTimeout: timeoutConfig.ChainStateTimeout, + } + encodingWorkerPool := workerpool.New(config.NumConnections) + encodingStreamer, err := NewEncodingStreamer(streamerConfig, queue, chainState, encoderClient, assignmentCoordinator, batchTrigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, metrics, logger) + if err != nil { + return nil, err + } + + miniBatcher, err := NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, encodingStreamer, encodingWorkerPool, logger, metrics) + if err != nil { + return nil, err + } + + batchConfirmerConfig := BatchConfirmerConfig{ + PullInterval: config.PullInterval, + DispersalTimeout: timeoutConfig.DispersalTimeout, + DispersalStatusCheckInterval: config.DispersalStatusCheckInterval, + AttestationTimeout: timeoutConfig.AttestationTimeout, + SRSOrder: config.SRSOrder, + NumConnections: config.MinibatcherConfig.MaxNumConnections, + MaxNumRetriesPerBlob: config.MinibatcherConfig.MaxNumRetriesPerBlob, + } + batchConfirmer, err := NewBatchConfirmer(batchConfirmerConfig, queue, minibatchStore, dispatcher, chainState, assignmentCoordinator, encodingStreamer, aggregator, ethClient, transactor, txnManager, miniBatcher, logger, metrics) + if err != nil { + return nil, err + } + + return &Orchestrator{ + Config: config, + TimeoutConfig: timeoutConfig, + + Queue: queue, + MinibatchStore: minibatchStore, + Dispatcher: dispatcher, + EncoderClient: encoderClient, + + ChainState: chainState, + AssignmentCoordinator: assignmentCoordinator, + Aggregator: aggregator, + EncodingStreamer: encodingStreamer, + Transactor: transactor, + TransactionManager: txnManager, + Metrics: metrics, + + ethClient: ethClient, + finalizer: finalizer, + logger: logger.With("component", "Orchestrator"), + HeartbeatChan: heartbeatChan, + MiniBatcher: miniBatcher, + BatchConfirmer: batchConfirmer, + }, nil +} + +func (o *Orchestrator) Start(ctx context.Context) error { + err := o.ChainState.Start(ctx) + if err != nil { + return err + } + + // Wait for few seconds for indexer to index blockchain + // This won't be needed when we switch to using Graph node + time.Sleep(indexerWarmupDelay) + err = o.EncodingStreamer.Start(ctx) + if err != nil { + return err + } + batchTrigger := o.EncodingStreamer.EncodedSizeNotifier + + err = o.BatchConfirmer.Start(ctx) + if err != nil { + return err + } + + go func() { + receiptChan := o.TransactionManager.ReceiptChan() + for { + select { + case <-ctx.Done(): + return + case receiptOrErr := <-receiptChan: + o.logger.Info("received response from transaction manager", "receipt", receiptOrErr.Receipt, "err", receiptOrErr.Err) + err := o.BatchConfirmer.ProcessConfirmedBatch(ctx, receiptOrErr) + if err != nil { + o.logger.Error("failed to process confirmed batch", "err", err) + } + } + } + }() + o.TransactionManager.Start(ctx) + + o.finalizer.Start(ctx) + + go func() { + ticker := time.NewTicker(o.PullInterval) + defer ticker.Stop() + cancelFuncs := make([]context.CancelFunc, 0) + for { + select { + case <-ctx.Done(): + for _, cancel := range cancelFuncs { + cancel() + } + return + case <-ticker.C: + o.signalLiveness() + cancel, err := o.MiniBatcher.HandleSingleMinibatch(ctx) + if err != nil { + if errors.Is(err, errNoEncodedResults) { + o.logger.Warn("no encoded results to construct minibatch") + } else { + o.logger.Error("failed to process minibatch", "err", err) + } + } + if cancel != nil { + cancelFuncs = append(cancelFuncs, cancel) + } + case <-batchTrigger.Notify: + ticker.Stop() + o.signalLiveness() + cancel, err := o.MiniBatcher.HandleSingleMinibatch(ctx) + if err != nil { + if errors.Is(err, errNoEncodedResults) { + o.logger.Warn("no encoded results to construct minibatch") + } else { + o.logger.Error("failed to process minibatch", "err", err) + } + } + if cancel != nil { + cancelFuncs = append(cancelFuncs, cancel) + } + ticker.Reset(o.PullInterval) + } + } + }() + return nil +} + +func (o *Orchestrator) signalLiveness() { + select { + case o.HeartbeatChan <- time.Now(): + o.logger.Info("Heartbeat signal sent") + default: + // This case happens if there's no receiver ready to consume the heartbeat signal. + // It prevents the goroutine from blocking if the channel is full or not being listened to. + o.logger.Warn("Heartbeat signal skipped, no receiver on the channel") + } +} diff --git a/disperser/cmd/batcher/config.go b/disperser/cmd/batcher/config.go index 9ad499aa40..ad432bc4eb 100644 --- a/disperser/cmd/batcher/config.go +++ b/disperser/cmd/batcher/config.go @@ -32,8 +32,6 @@ type Config struct { BLSOperatorStateRetrieverAddr string EigenDAServiceManagerAddr string - EnableMinibatch bool - EnableGnarkBundleEncoding bool } @@ -57,21 +55,31 @@ func NewConfig(ctx *cli.Context) (Config, error) { EncoderConfig: kzg.ReadCLIConfig(ctx), LoggerConfig: *loggerConfig, BatcherConfig: batcher.Config{ - PullInterval: ctx.GlobalDuration(flags.PullIntervalFlag.Name), - FinalizerInterval: ctx.GlobalDuration(flags.FinalizerIntervalFlag.Name), - FinalizerPoolSize: ctx.GlobalInt(flags.FinalizerPoolSizeFlag.Name), - EncoderSocket: ctx.GlobalString(flags.EncoderSocket.Name), - NumConnections: ctx.GlobalInt(flags.NumConnectionsFlag.Name), - EncodingRequestQueueSize: ctx.GlobalInt(flags.EncodingRequestQueueSizeFlag.Name), - BatchSizeMBLimit: ctx.GlobalUint(flags.BatchSizeLimitFlag.Name), - SRSOrder: ctx.GlobalInt(flags.SRSOrderFlag.Name), - MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name), - TargetNumChunks: ctx.GlobalUint(flags.TargetNumChunksFlag.Name), - MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name), - FinalizationBlockDelay: ctx.GlobalUint(flags.FinalizationBlockDelayFlag.Name), + PullInterval: ctx.GlobalDuration(flags.PullIntervalFlag.Name), + FinalizerInterval: ctx.GlobalDuration(flags.FinalizerIntervalFlag.Name), + FinalizerPoolSize: ctx.GlobalInt(flags.FinalizerPoolSizeFlag.Name), + EncoderSocket: ctx.GlobalString(flags.EncoderSocket.Name), + NumConnections: ctx.GlobalInt(flags.NumConnectionsFlag.Name), + EncodingRequestQueueSize: ctx.GlobalInt(flags.EncodingRequestQueueSizeFlag.Name), + BatchSizeMBLimit: ctx.GlobalUint(flags.BatchSizeLimitFlag.Name), + SRSOrder: ctx.GlobalInt(flags.SRSOrderFlag.Name), + MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name), + TargetNumChunks: ctx.GlobalUint(flags.TargetNumChunksFlag.Name), + MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name), + FinalizationBlockDelay: ctx.GlobalUint(flags.FinalizationBlockDelayFlag.Name), + EnableMinibatch: ctx.Bool(flags.EnableMinibatchFlag.Name), + BatchstoreTableName: ctx.GlobalString(flags.BatchstoreTableNameFlag.Name), + DispersalStatusCheckInterval: ctx.GlobalDuration(flags.DispersalStatusCheckIntervalFlag.Name), + MinibatcherConfig: batcher.MinibatcherConfig{ + PullInterval: ctx.GlobalDuration(flags.MinibatcherPullIntervalFlag.Name), + MaxNumConnections: ctx.GlobalUint(flags.MaxNodeConnectionsFlag.Name), + MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name), + MaxNumRetriesPerDispersal: ctx.GlobalUint(flags.MaxNumRetriesPerDispersalFlag.Name), + }, }, TimeoutConfig: batcher.TimeoutConfig{ EncodingTimeout: ctx.GlobalDuration(flags.EncodingTimeoutFlag.Name), + DispersalTimeout: ctx.GlobalDuration(flags.DispersalTimeoutFlag.Name), AttestationTimeout: ctx.GlobalDuration(flags.AttestationTimeoutFlag.Name), ChainReadTimeout: ctx.GlobalDuration(flags.ChainReadTimeoutFlag.Name), ChainWriteTimeout: ctx.GlobalDuration(flags.ChainWriteTimeoutFlag.Name), @@ -89,7 +97,6 @@ func NewConfig(ctx *cli.Context) (Config, error) { IndexerDataDir: ctx.GlobalString(flags.IndexerDataDirFlag.Name), IndexerConfig: indexer.ReadIndexerConfig(ctx), KMSKeyConfig: kmsConfig, - EnableMinibatch: ctx.Bool(flags.EnableMinibatchFlag.Name), EnableGnarkBundleEncoding: ctx.Bool(flags.EnableGnarkBundleEncodingFlag.Name), } return config, nil diff --git a/disperser/cmd/batcher/flags/flags.go b/disperser/cmd/batcher/flags/flags.go index 91c5f5ea6f..3bea077367 100644 --- a/disperser/cmd/batcher/flags/flags.go +++ b/disperser/cmd/batcher/flags/flags.go @@ -228,6 +228,26 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_NUM_RETRIES_PER_DISPERSAL"), Value: 3, } + BatchstoreTableNameFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "batchstore-table-name"), + Usage: "Name of the dynamodb table to store minibatch datamodel", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "BATCHSTORE_TABLE_NAME"), + } + DispersalStatusCheckIntervalFlag = cli.DurationFlag{ + Name: "dispersal-status-check-interval", + Usage: "Dispersal status check interval", + Required: false, + Value: 5 * time.Second, + EnvVar: common.PrefixEnvVar(envVarPrefix, "DISPERSAL_STATUS_CHECK_INTERVAL"), + } + DispersalTimeoutFlag = cli.DurationFlag{ + Name: "dispersal-timeout", + Usage: "dispersal connection timeout from grpc call to DA nodes for dispersal", + Required: false, + Value: 60 * time.Second, + EnvVar: common.PrefixEnvVar(envVarPrefix, "DISPERSAL_TIMEOUT"), + } ) var requiredFlags = []cli.Flag{ @@ -265,6 +285,9 @@ var optionalFlags = []cli.Flag{ MaxNodeConnectionsFlag, MaxNumRetriesPerDispersalFlag, EnableGnarkBundleEncodingFlag, + BatchstoreTableNameFlag, + DispersalStatusCheckIntervalFlag, + DispersalTimeoutFlag, } // Flags contains the list of configuration options available to the binary. diff --git a/disperser/cmd/batcher/main.go b/disperser/cmd/batcher/main.go index bdbb43aa52..42ab35f4c4 100644 --- a/disperser/cmd/batcher/main.go +++ b/disperser/cmd/batcher/main.go @@ -18,6 +18,7 @@ import ( "github.com/Layr-Labs/eigenda/core" coreeth "github.com/Layr-Labs/eigenda/core/eth" "github.com/Layr-Labs/eigenda/disperser/batcher" + "github.com/Layr-Labs/eigenda/disperser/batcher/batchstore" dispatcher "github.com/Layr-Labs/eigenda/disperser/batcher/grpc" "github.com/Layr-Labs/eigenda/disperser/cmd/batcher/flags" "github.com/Layr-Labs/eigenda/disperser/common/blobstore" @@ -84,6 +85,10 @@ func RunBatcher(ctx *cli.Context) error { return err } + if config.BatcherConfig.EnableMinibatch && config.BatcherConfig.BatchstoreTableName == "" { + return errors.New("minibatch store table name required when minibatch is enabled") + } + bucketName := config.BlobstoreConfig.BucketName s3Client, err := s3.NewClient(context.Background(), config.AwsClientConfig, logger) if err != nil { @@ -129,9 +134,7 @@ func RunBatcher(ctx *cli.Context) error { return fmt.Errorf("failed to get chain ID: %w", err) } signer := signerv2.NewKMSSigner(context.Background(), kmsClient, pubKey, config.KMSKeyConfig.KeyID, chainID) - if err != nil { - return err - } + wallet, err = walletsdk.NewPrivateKeyWallet(client, signer, addr, logger) if err != nil { return err @@ -239,9 +242,16 @@ func RunBatcher(ctx *cli.Context) error { logger.Info("Enabled metrics for Batcher", "socket", httpSocket) } - if config.EnableMinibatch { - // TODO: implement and run batchConfirmer for minibatch - return errors.New("minibatch is not supported") + if config.BatcherConfig.EnableMinibatch { + minibatchStore := batchstore.NewMinibatchStore(dynamoClient, logger, config.BatcherConfig.BatchstoreTableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) + orchestrator, err := batcher.NewOrchestrator(config.BatcherConfig, config.TimeoutConfig, queue, minibatchStore, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan) + if err != nil { + return err + } + err = orchestrator.Start(context.Background()) + if err != nil { + return err + } } else { batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan) if err != nil { From 1318903e091ac0be91f2b46af0646db8460196b7 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Tue, 27 Aug 2024 21:21:57 -0700 Subject: [PATCH 02/10] Add reference block number to minibatch blob header --- core/data.go | 3 +++ disperser/batcher/encoding_streamer.go | 3 ++- disperser/batcher/grpc/dispatcher.go | 11 ++++++----- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/core/data.go b/core/data.go index 45f9e750f8..dc3cef5464 100644 --- a/core/data.go +++ b/core/data.go @@ -275,6 +275,9 @@ type BlobHeader struct { // AccountID is the account that is paying for the blob to be stored AccountID AccountID + + // Reference block number + ReferenceBlockNumber uint } func (b *BlobHeader) GetQuorumInfo(quorum QuorumID) *BlobQuorumInfo { diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 6b7917455e..f88afbe289 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -492,7 +492,8 @@ func (e *EncodingStreamer) CreateMinibatch(ctx context.Context) (*batch, error) metadataByKey[blobKey] = result.BlobMetadata blobQuorums[blobKey] = make([]*core.BlobQuorumInfo, 0) blobHeader := &core.BlobHeader{ - BlobCommitments: *result.Commitment, + BlobCommitments: *result.Commitment, + ReferenceBlockNumber: e.ReferenceBlockNumber, } blobHeaderByKey[blobKey] = blobHeader encodedBlobByKey[blobKey] = core.EncodedBlob{ diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 6597492722..a95e0adad1 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -409,11 +409,12 @@ func getBlobMessage(blob *core.EncodedBlobMessage, useGnarkBundleEncoding bool) return &node.Blob{ Header: &node.BlobHeader{ - Commitment: commitData, - LengthCommitment: &lengthCommitData, - LengthProof: &lengthProofData, - Length: uint32(blob.BlobHeader.Length), - QuorumHeaders: quorumHeaders, + Commitment: commitData, + LengthCommitment: &lengthCommitData, + LengthProof: &lengthProofData, + Length: uint32(blob.BlobHeader.Length), + QuorumHeaders: quorumHeaders, + ReferenceBlockNumber: uint32(blob.BlobHeader.ReferenceBlockNumber), }, Bundles: bundles, }, nil From 67d26ec2783f4ecd832885f92c65760e3a603692 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Tue, 27 Aug 2024 23:09:15 -0700 Subject: [PATCH 03/10] Debug --- disperser/batcher/minibatcher.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go index c898a7167d..0f67171695 100644 --- a/disperser/batcher/minibatcher.go +++ b/disperser/batcher/minibatcher.go @@ -226,8 +226,10 @@ func (b *Minibatcher) DisperseBatch(ctx context.Context, state *core.IndexedOper if err != nil { b.logger.Errorf("failed to send blobs to operator %s: %v", opID.Hex(), err) } + b.logger.Debug("Received signatures from operator", "operator", opID.Hex(), "numSignatures", len(signatures)) compressedSignatures := make([][32]byte, 0, len(signatures)) - for _, signature := range signatures { + for idx, signature := range signatures { + b.logger.Debug("Append signature", "operator", opID.Hex(), "blobIndex", idx, "signature", signature) compressedSignatures = append(compressedSignatures, signature.Bytes()) } // Update the minibatch state From 47f7cb60cc6c2e6ad01cd681402b4e6bc2796e7f Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Wed, 28 Aug 2024 00:07:21 -0700 Subject: [PATCH 04/10] Fix tests --- disperser/batcher/minibatcher_test.go | 85 +++++++++++++++++---------- 1 file changed, 53 insertions(+), 32 deletions(-) diff --git a/disperser/batcher/minibatcher_test.go b/disperser/batcher/minibatcher_test.go index 0f347021cf..afa8472cd6 100644 --- a/disperser/batcher/minibatcher_test.go +++ b/disperser/batcher/minibatcher_test.go @@ -8,6 +8,7 @@ import ( "time" cmock "github.com/Layr-Labs/eigenda/common/mock" + commonmock "github.com/Layr-Labs/eigenda/common/mock" "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/disperser" @@ -26,6 +27,7 @@ import ( var ( opId0, _ = core.OperatorIDFromHex("e22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311") opId1, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568312") + opId2, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568313") mockChainState, _ = coremock.NewChainDataMock(map[uint8]map[core.OperatorID]int{ 0: { opId0: 1, @@ -34,6 +36,9 @@ var ( 1: { opId0: 1, }, + 2: { + opId2: 1, + }, }) defaultConfig = batcher.MinibatcherConfig{ PullInterval: 1 * time.Second, @@ -47,15 +52,16 @@ const ( ) type minibatcherComponents struct { - minibatcher *batcher.Minibatcher - blobStore disperser.BlobStore - minibatchStore batcher.MinibatchStore - dispatcher *dmock.Dispatcher - chainState *core.IndexedOperatorState - encodingStreamer *batcher.EncodingStreamer - pool *workerpool.WorkerPool - logger logging.Logger - metrics *batcher.Metrics + minibatcher *batcher.Minibatcher + blobStore disperser.BlobStore + minibatchStore batcher.MinibatchStore + dispatcher *dmock.Dispatcher + chainState *coremock.ChainDataMock + assignmentCoordinator core.AssignmentCoordinator + encodingStreamer *batcher.EncodingStreamer + pool *workerpool.WorkerPool + ethClient *commonmock.MockEthClient + logger logging.Logger } func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcherComponents { @@ -88,18 +94,22 @@ func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcher ) encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, chainState, encoderClient, asgn, trigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, metrics, logger) assert.NoError(t, err) + ethClient := &cmock.MockEthClient{} pool := workerpool.New(int(config.MaxNumConnections)) m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, encodingStreamer, pool, logger, metrics) assert.NoError(t, err) + return &minibatcherComponents{ - minibatcher: m, - blobStore: blobStore, - minibatchStore: minibatchStore, - dispatcher: dispatcher, - encodingStreamer: encodingStreamer, - pool: pool, - logger: logger, - metrics: metrics, + minibatcher: m, + blobStore: blobStore, + minibatchStore: minibatchStore, + dispatcher: dispatcher, + chainState: chainState, + assignmentCoordinator: asgn, + encodingStreamer: encodingStreamer, + pool: pool, + ethClient: ethClient, + logger: logger, } } @@ -161,10 +171,12 @@ func TestDisperseMinibatch(t *testing.T) { // Check the dispersal records dispersal, err := c.minibatchStore.GetDispersal(ctx, c.minibatcher.CurrentBatchID, 0, opId0) assert.NoError(t, err) + operatorState, err := c.chainState.GetIndexedOperatorState(context.Background(), 0, []core.QuorumID{0, 1}) + assert.NoError(t, err) assert.Equal(t, dispersal.BatchID, c.minibatcher.CurrentBatchID) assert.Equal(t, dispersal.MinibatchIndex, uint(0)) assert.Equal(t, dispersal.OperatorID, opId0) - assert.Equal(t, dispersal.Socket, c.chainState.IndexedOperators[opId0].Socket) + assert.Equal(t, dispersal.Socket, operatorState.IndexedOperators[opId0].Socket) assert.Equal(t, dispersal.NumBlobs, uint(2)) assert.NotNil(t, dispersal.RequestedAt) @@ -173,7 +185,7 @@ func TestDisperseMinibatch(t *testing.T) { assert.Equal(t, dispersal.BatchID, c.minibatcher.CurrentBatchID) assert.Equal(t, dispersal.MinibatchIndex, uint(0)) assert.Equal(t, dispersal.OperatorID, opId1) - assert.Equal(t, dispersal.Socket, c.chainState.IndexedOperators[opId1].Socket) + assert.Equal(t, dispersal.Socket, operatorState.IndexedOperators[opId1].Socket) assert.Equal(t, dispersal.NumBlobs, uint(2)) assert.NotNil(t, dispersal.RequestedAt) @@ -330,11 +342,11 @@ func TestDisperseMinibatch(t *testing.T) { assert.Len(t, c.minibatcher.Batches, 1) assert.Nil(t, c.minibatcher.Batches[b.ID]) - c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 6) + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 9) dispersals, err := c.minibatchStore.GetDispersalsByMinibatch(ctx, b.ID, 0) assert.NoError(t, err) - assert.Len(t, dispersals, 2) - opIDs := make([]core.OperatorID, 2) + assert.Len(t, dispersals, 3) + opIDs := make([]core.OperatorID, 3) for i, dispersal := range dispersals { assert.Equal(t, dispersal.BatchID, b.ID) assert.Equal(t, dispersal.MinibatchIndex, uint(0)) @@ -346,7 +358,7 @@ func TestDisperseMinibatch(t *testing.T) { assert.NoError(t, dispersal.Error) assert.Len(t, dispersal.Signatures, 1) } - assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1}) + assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1, opId2}) } func TestDisperseMinibatchFailure(t *testing.T) { @@ -404,11 +416,11 @@ func TestDisperseMinibatchFailure(t *testing.T) { assert.Equal(t, c.minibatcher.ReferenceBlockNumber, b.ReferenceBlockNumber) c.pool.StopWait() - c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2) + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 3) dispersals, err := c.minibatchStore.GetDispersalsByMinibatch(ctx, c.minibatcher.CurrentBatchID, 0) assert.NoError(t, err) - assert.Len(t, dispersals, 2) - opIDs := make([]core.OperatorID, 2) + assert.Len(t, dispersals, 3) + opIDs := make([]core.OperatorID, 3) for i, dispersal := range dispersals { assert.Equal(t, dispersal.BatchID, c.minibatcher.CurrentBatchID) assert.Equal(t, dispersal.MinibatchIndex, uint(0)) @@ -420,7 +432,7 @@ func TestDisperseMinibatchFailure(t *testing.T) { assert.NoError(t, dispersal.Error) assert.Len(t, dispersal.Signatures, 1) } - assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1}) + assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1, opId2}) } func TestSendBlobsToOperatorWithRetries(t *testing.T) { @@ -471,10 +483,17 @@ func TestSendBlobsToOperatorWithRetries(t *testing.T) { assert.NoError(t, err) assert.Len(t, signatures, 1) + c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fail")).Twice() + c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*core.Signature{sig}, nil).Once() + signatures, err = c.minibatcher.SendBlobsToOperatorWithRetries(ctx, batch.EncodedBlobs, batch.BatchHeader, batch.State.IndexedOperators[opId2], opId2, 3) + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 6) + assert.NoError(t, err) + assert.Len(t, signatures, 1) + c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fail")).Times(3) c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*core.Signature{sig}, nil).Once() signatures, err = c.minibatcher.SendBlobsToOperatorWithRetries(ctx, batch.EncodedBlobs, batch.BatchHeader, batch.State.IndexedOperators[opId1], opId1, 3) - c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 6) + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 9) assert.ErrorContains(t, err, "failed to send chunks to operator") assert.Nil(t, signatures) } @@ -497,15 +516,17 @@ func TestSendBlobsToOperatorWithRetriesCanceled(t *testing.T) { assert.NoError(t, err) batch, err := c.encodingStreamer.CreateMinibatch(ctx) assert.NoError(t, err) + operators, err := c.chainState.GetIndexedOperators(ctx, initialBlock) + assert.NoError(t, err) minibatchIndex := uint(12) c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, context.Canceled) - c.minibatcher.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, c.minibatcher.CurrentBatchID, minibatchIndex) + c.minibatcher.DisperseBatch(ctx, operators, batch.EncodedBlobs, batch.BatchHeader, c.minibatcher.CurrentBatchID, minibatchIndex) c.pool.StopWait() dispersals, err := c.minibatchStore.GetDispersalsByMinibatch(ctx, c.minibatcher.CurrentBatchID, minibatchIndex) assert.NoError(t, err) - assert.Len(t, dispersals, 2) + assert.Len(t, dispersals, 3) - indexedState, err := mockChainState.GetIndexedOperatorState(ctx, initialBlock, []core.QuorumID{0}) + indexedState, err := mockChainState.GetIndexedOperatorState(ctx, initialBlock, []core.QuorumID{0, 1, 2}) assert.NoError(t, err) assert.Len(t, dispersals, len(indexedState.IndexedOperators)) for _, dispersal := range dispersals { @@ -519,7 +540,7 @@ func TestMinibatcherTooManyPendingRequests(t *testing.T) { ctx := context.Background() mockWorkerPool := &cmock.MockWorkerpool{} // minibatcher with mock worker pool - m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.encodingStreamer, mockWorkerPool, c.logger, c.metrics) + m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.encodingStreamer, mockWorkerPool, c.logger, c.minibatcher.Metrics) assert.NoError(t, err) mockWorkerPool.On("WaitingQueueSize").Return(int(defaultConfig.MaxNumConnections + 1)).Once() _, err = m.HandleSingleMinibatch(ctx) From 268d14b1325cd41b0419df582975a4873f3d34e7 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Wed, 28 Aug 2024 00:27:25 -0700 Subject: [PATCH 05/10] Fix tests --- disperser/batcher/batch_confirmer_test.go | 2 +- disperser/batcher/minibatcher.go | 23 +++++++++++++++-------- disperser/batcher/minibatcher_test.go | 4 ++-- disperser/batcher/orchestrator.go | 2 +- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/disperser/batcher/batch_confirmer_test.go b/disperser/batcher/batch_confirmer_test.go index b4a8509b82..4dae027f68 100644 --- a/disperser/batcher/batch_confirmer_test.go +++ b/disperser/batcher/batch_confirmer_test.go @@ -75,7 +75,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents { MaxNumConnections: 10, MaxNumRetriesPerBlob: 2, MaxNumRetriesPerDispersal: 1, - }, blobStore, minibatchStore, dispatcher, encodingStreamer, pool, logger, metrics) + }, blobStore, minibatchStore, dispatcher, mockChainState, encodingStreamer, pool, logger, metrics) assert.NoError(t, err) config := bat.BatchConfirmerConfig{ diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go index 0f67171695..0054f61332 100644 --- a/disperser/batcher/minibatcher.go +++ b/disperser/batcher/minibatcher.go @@ -34,6 +34,7 @@ type Minibatcher struct { BlobStore disperser.BlobStore MinibatchStore MinibatchStore Dispatcher disperser.Dispatcher + ChainState core.IndexedChainState EncodingStreamer *EncodingStreamer Pool common.WorkerPool @@ -52,6 +53,7 @@ func NewMinibatcher( blobStore disperser.BlobStore, minibatchStore MinibatchStore, dispatcher disperser.Dispatcher, + chainState core.IndexedChainState, encodingStreamer *EncodingStreamer, workerpool common.WorkerPool, logger logging.Logger, @@ -62,6 +64,7 @@ func NewMinibatcher( BlobStore: blobStore, MinibatchStore: minibatchStore, Dispatcher: dispatcher, + ChainState: chainState, EncodingStreamer: encodingStreamer, Pool: workerpool, @@ -75,11 +78,6 @@ func NewMinibatcher( }, nil } -func (b *Minibatcher) RecoverState(ctx context.Context) error { - //TODO: Implement minibatch recovery - return fmt.Errorf("minibatch state recovery not implemented") -} - func (b *Minibatcher) PopBatchState(batchID uuid.UUID) *BatchState { batchState, ok := b.Batches[batchID] if !ok { @@ -179,7 +177,16 @@ func (b *Minibatcher) HandleSingleMinibatch(ctx context.Context) (context.Cancel err := b.createBlobMinibatchMappings(ctx, b.CurrentBatchID, b.MinibatchIndex, minibatch.BlobMetadata, minibatch.BlobHeaders) storeMappingsChan <- err }() - b.DisperseBatch(dispersalCtx, minibatch.State, minibatch.EncodedBlobs, minibatch.BatchHeader, b.CurrentBatchID, b.MinibatchIndex) + + // Disperse the minibatch to operators in all quorums + // If an operator doesn't have any bundles, it won't receive any chunks but it will still receive blob headers + operatorsAllQuorums, err := b.ChainState.GetIndexedOperators(ctx, b.ReferenceBlockNumber) + if err != nil { + cancelDispersal() + _ = b.handleFailure(ctx, minibatch.BlobMetadata, FailReason("error getting operator state for all quorums")) + return nil, fmt.Errorf("error getting operator state for all quorums: %w", err) + } + b.DisperseBatch(dispersalCtx, operatorsAllQuorums, minibatch.EncodedBlobs, minibatch.BatchHeader, b.CurrentBatchID, b.MinibatchIndex) log.Debug("DisperseBatch took", "duration", time.Since(stageTimer).String()) h, err := minibatch.State.OperatorState.Hash() @@ -204,8 +211,8 @@ func (b *Minibatcher) HandleSingleMinibatch(ctx context.Context) (context.Cancel return cancelDispersal, nil } -func (b *Minibatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, batchID uuid.UUID, minibatchIndex uint) { - for id, op := range state.IndexedOperators { +func (b *Minibatcher) DisperseBatch(ctx context.Context, operators map[core.OperatorID]*core.IndexedOperatorInfo, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, batchID uuid.UUID, minibatchIndex uint) { + for id, op := range operators { opInfo := op opID := id req := &MinibatchDispersal{ diff --git a/disperser/batcher/minibatcher_test.go b/disperser/batcher/minibatcher_test.go index afa8472cd6..ff91452d22 100644 --- a/disperser/batcher/minibatcher_test.go +++ b/disperser/batcher/minibatcher_test.go @@ -96,7 +96,7 @@ func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcher assert.NoError(t, err) ethClient := &cmock.MockEthClient{} pool := workerpool.New(int(config.MaxNumConnections)) - m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, encodingStreamer, pool, logger, metrics) + m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, chainState, encodingStreamer, pool, logger, metrics) assert.NoError(t, err) return &minibatcherComponents{ @@ -540,7 +540,7 @@ func TestMinibatcherTooManyPendingRequests(t *testing.T) { ctx := context.Background() mockWorkerPool := &cmock.MockWorkerpool{} // minibatcher with mock worker pool - m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.encodingStreamer, mockWorkerPool, c.logger, c.minibatcher.Metrics) + m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.chainState, c.encodingStreamer, mockWorkerPool, c.logger, c.minibatcher.Metrics) assert.NoError(t, err) mockWorkerPool.On("WaitingQueueSize").Return(int(defaultConfig.MaxNumConnections + 1)).Once() _, err = m.HandleSingleMinibatch(ctx) diff --git a/disperser/batcher/orchestrator.go b/disperser/batcher/orchestrator.go index a24528494a..4eb2732a86 100644 --- a/disperser/batcher/orchestrator.go +++ b/disperser/batcher/orchestrator.go @@ -75,7 +75,7 @@ func NewOrchestrator( return nil, err } - miniBatcher, err := NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, encodingStreamer, encodingWorkerPool, logger, metrics) + miniBatcher, err := NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, chainState, encodingStreamer, encodingWorkerPool, logger, metrics) if err != nil { return nil, err } From 8926c1cdb38427df25710aa923ebdfba7f602963 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Wed, 28 Aug 2024 00:55:31 -0700 Subject: [PATCH 06/10] Debug --- disperser/batcher/minibatcher.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go index 0054f61332..410264f6d0 100644 --- a/disperser/batcher/minibatcher.go +++ b/disperser/batcher/minibatcher.go @@ -235,9 +235,12 @@ func (b *Minibatcher) DisperseBatch(ctx context.Context, operators map[core.Oper } b.logger.Debug("Received signatures from operator", "operator", opID.Hex(), "numSignatures", len(signatures)) compressedSignatures := make([][32]byte, 0, len(signatures)) - for idx, signature := range signatures { - b.logger.Debug("Append signature", "operator", opID.Hex(), "blobIndex", idx, "signature", signature) - compressedSignatures = append(compressedSignatures, signature.Bytes()) + for _, signature := range signatures { + if signature == nil { + err = fmt.Errorf("empty signatures") + } else { + compressedSignatures = append(compressedSignatures, signature.Bytes()) + } } // Update the minibatch state err = b.MinibatchStore.UpdateDispersalResponse(ctx, req, &DispersalResponse{ From d19cf6242579c0a8c284284c1fb86152c4da0755 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Wed, 28 Aug 2024 09:14:29 -0700 Subject: [PATCH 07/10] Cast ResponsdedAt to Numeric field --- disperser/batcher/batchstore/minibatch_store.go | 1 + disperser/batcher/minibatcher.go | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go index 3d42540fb3..79b419a6e2 100644 --- a/disperser/batcher/batchstore/minibatch_store.go +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -158,6 +158,7 @@ func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]t if err != nil { return nil, err } + fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())} return fields, nil } diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go index 410264f6d0..55611e3df4 100644 --- a/disperser/batcher/minibatcher.go +++ b/disperser/batcher/minibatcher.go @@ -233,11 +233,10 @@ func (b *Minibatcher) DisperseBatch(ctx context.Context, operators map[core.Oper if err != nil { b.logger.Errorf("failed to send blobs to operator %s: %v", opID.Hex(), err) } - b.logger.Debug("Received signatures from operator", "operator", opID.Hex(), "numSignatures", len(signatures)) compressedSignatures := make([][32]byte, 0, len(signatures)) for _, signature := range signatures { if signature == nil { - err = fmt.Errorf("empty signatures") + err = fmt.Errorf("empty signature") } else { compressedSignatures = append(compressedSignatures, signature.Bytes()) } From f1b18883e360d092d1da400952bad52658a1fde5 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Wed, 28 Aug 2024 13:33:05 -0700 Subject: [PATCH 08/10] Marshal/unmarshal dispersal error --- .../batcher/batchstore/minibatch_store.go | 28 ++++++++++++++++++ .../batchstore/minibatch_store_test.go | 29 +++++++++++++++++++ disperser/batcher/minibatch_store.go | 2 +- 3 files changed, 58 insertions(+), 1 deletion(-) diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go index 79b419a6e2..9836cdfc63 100644 --- a/disperser/batcher/batchstore/minibatch_store.go +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -2,6 +2,7 @@ package batchstore import ( "context" + "errors" "fmt" "strconv" "time" @@ -145,6 +146,9 @@ func MarshalDispersal(response *batcher.MinibatchDispersal) (map[string]types.At if err != nil { return nil, err } + if response.Error != nil { + fields["Error"] = &types.AttributeValueMemberS{Value: response.DispersalResponse.Error.Error()} + } fields["BatchID"] = &types.AttributeValueMemberS{Value: response.BatchID.String()} fields["SK"] = &types.AttributeValueMemberS{Value: dispersalSKPrefix + fmt.Sprintf("%d#%s", response.MinibatchIndex, response.OperatorID.Hex())} fields["OperatorID"] = &types.AttributeValueMemberS{Value: response.OperatorID.Hex()} @@ -158,6 +162,7 @@ func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]t if err != nil { return nil, err } + fields["Error"] = &types.AttributeValueMemberS{Value: response.Error.Error()} fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())} return fields, nil } @@ -193,6 +198,22 @@ func UnmarshalBatchID(item commondynamodb.Item) (*uuid.UUID, error) { return &batchID, nil } +func UnmarshalError(item commondynamodb.Item) (error, error) { + type Error struct { + Error string + } + var e Error + err := attributevalue.UnmarshalMap(item, &e) + if err != nil { + return nil, err + } + + if e.Error == "" { + return nil, nil + } + return errors.New(e.Error), nil +} + func UnmarshalBlobKey(item commondynamodb.Item) (*disperser.BlobKey, error) { blobKey := disperser.BlobKey{} err := attributevalue.UnmarshalMap(item, &blobKey) @@ -258,6 +279,12 @@ func UnmarshalDispersal(item commondynamodb.Item) (*batcher.MinibatchDispersal, } response.OperatorID = *operatorID + dispersalError, err := UnmarshalError(item) + if err != nil { + return nil, err + } + response.OperatorID = *operatorID + response.Error = dispersalError response.RespondedAt = response.RespondedAt.UTC() response.RequestedAt = response.RequestedAt.UTC() return &response, nil @@ -308,6 +335,7 @@ func (m *MinibatchStore) UpdateDispersalResponse(ctx context.Context, dispersal if err != nil { return err } + m.logger.Info("updating dispersal response", "batchID", dispersal.BatchID, "response", marshaledResponse) _, err = m.dynamoDBClient.UpdateItem(ctx, m.tableName, map[string]types.AttributeValue{ "BatchID": &types.AttributeValueMemberS{ Value: dispersal.BatchID.String(), diff --git a/disperser/batcher/batchstore/minibatch_store_test.go b/disperser/batcher/batchstore/minibatch_store_test.go index 27583f0de7..82db267a33 100644 --- a/disperser/batcher/batchstore/minibatch_store_test.go +++ b/disperser/batcher/batchstore/minibatch_store_test.go @@ -2,6 +2,7 @@ package batchstore_test import ( "context" + "errors" "fmt" "os" "testing" @@ -237,6 +238,34 @@ func TestPutDispersal(t *testing.T) { assert.Equal(t, response, r) } +func TestDispersalError(t *testing.T) { + ctx := context.Background() + id, err := uuid.NewV7() + assert.NoError(t, err) + ts := time.Now().Truncate(time.Second).UTC() + opID := core.OperatorID([32]byte{123}) + response := &batcher.MinibatchDispersal{ + BatchID: id, + MinibatchIndex: 0, + OperatorID: opID, + OperatorAddress: gcommon.HexToAddress("0x0"), + Socket: "socket", + NumBlobs: 1, + RequestedAt: ts, + DispersalResponse: batcher.DispersalResponse{ + Signatures: nil, + RespondedAt: ts, + Error: errors.New("error"), + }, + } + err = minibatchStore.PutDispersal(ctx, response) + assert.NoError(t, err) + r, err := minibatchStore.GetDispersal(ctx, response.BatchID, response.MinibatchIndex, opID) + assert.NoError(t, err) + assert.Equal(t, response, r) + assert.Equal(t, response.DispersalResponse.Error.Error(), r.DispersalResponse.Error.Error()) +} + func TestDispersalStatus(t *testing.T) { ctx := context.Background() id, err := uuid.NewV7() diff --git a/disperser/batcher/minibatch_store.go b/disperser/batcher/minibatch_store.go index fca656af9a..d69af82a6c 100644 --- a/disperser/batcher/minibatch_store.go +++ b/disperser/batcher/minibatch_store.go @@ -45,7 +45,7 @@ type DispersalResponse struct { // The signatures are compressed using G1Affine.Bytes(), to be decompressed using G1Affine.SetBytes() Signatures [][32]byte RespondedAt time.Time - Error error + Error error `dynamodbav:"-"` } type MinibatchDispersal struct { From f543816e2a48400a997d3c3e48a1e2102c2322c0 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Wed, 28 Aug 2024 13:56:20 -0700 Subject: [PATCH 09/10] Debug --- disperser/batcher/minibatcher.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go index 55611e3df4..639e6db3a0 100644 --- a/disperser/batcher/minibatcher.go +++ b/disperser/batcher/minibatcher.go @@ -234,9 +234,10 @@ func (b *Minibatcher) DisperseBatch(ctx context.Context, operators map[core.Oper b.logger.Errorf("failed to send blobs to operator %s: %v", opID.Hex(), err) } compressedSignatures := make([][32]byte, 0, len(signatures)) - for _, signature := range signatures { + b.logger.Info("received signatures", "operator", opID.Hex(), "numSignatures", len(signatures), "signatures", signatures) + for idx, signature := range signatures { if signature == nil { - err = fmt.Errorf("empty signature") + err = fmt.Errorf("empty signature at index %d", idx) } else { compressedSignatures = append(compressedSignatures, signature.Bytes()) } From 4497dfca3cd9a11dfe03aa0eec5767dd2dc80cf2 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Wed, 28 Aug 2024 14:53:17 -0700 Subject: [PATCH 10/10] Remove debug output --- disperser/batcher/batch_confirmer.go | 2 ++ disperser/batcher/batchstore/minibatch_store.go | 5 +++-- disperser/batcher/minibatcher.go | 5 ++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/disperser/batcher/batch_confirmer.go b/disperser/batcher/batch_confirmer.go index 1ab2853362..e55beb1eda 100644 --- a/disperser/batcher/batch_confirmer.go +++ b/disperser/batcher/batch_confirmer.go @@ -332,6 +332,7 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error { } } + b.logger.Info("Processing batch", "batchID", batch.ID) // Make sure all minibatches in the batch have been dispersed batchDispersed := false stateUpdateTicker.Reset(b.DispersalStatusCheckInterval) @@ -340,6 +341,7 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error { for !batchDispersed { select { case <-ctxWithTimeout.Done(): + b.logger.Error("timed out waiting for batch to disperse") return ctxWithTimeout.Err() case <-stateUpdateTicker.C: batchDispersed, err = b.MinibatchStore.BatchDispersed(ctx, batch.ID, batch.NumMinibatches) diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go index 9836cdfc63..e1c72d6b74 100644 --- a/disperser/batcher/batchstore/minibatch_store.go +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -162,7 +162,9 @@ func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]t if err != nil { return nil, err } - fields["Error"] = &types.AttributeValueMemberS{Value: response.Error.Error()} + if response.Error != nil { + fields["Error"] = &types.AttributeValueMemberS{Value: response.Error.Error()} + } fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())} return fields, nil } @@ -335,7 +337,6 @@ func (m *MinibatchStore) UpdateDispersalResponse(ctx context.Context, dispersal if err != nil { return err } - m.logger.Info("updating dispersal response", "batchID", dispersal.BatchID, "response", marshaledResponse) _, err = m.dynamoDBClient.UpdateItem(ctx, m.tableName, map[string]types.AttributeValue{ "BatchID": &types.AttributeValueMemberS{ Value: dispersal.BatchID.String(), diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go index 639e6db3a0..768e9c4f4a 100644 --- a/disperser/batcher/minibatcher.go +++ b/disperser/batcher/minibatcher.go @@ -234,10 +234,9 @@ func (b *Minibatcher) DisperseBatch(ctx context.Context, operators map[core.Oper b.logger.Errorf("failed to send blobs to operator %s: %v", opID.Hex(), err) } compressedSignatures := make([][32]byte, 0, len(signatures)) - b.logger.Info("received signatures", "operator", opID.Hex(), "numSignatures", len(signatures), "signatures", signatures) - for idx, signature := range signatures { + for _, signature := range signatures { if signature == nil { - err = fmt.Errorf("empty signature at index %d", idx) + compressedSignatures = append(compressedSignatures, [32]byte{}) } else { compressedSignatures = append(compressedSignatures, signature.Bytes()) }