diff --git a/disperser/batcher/inmem/minibatch_store.go b/disperser/batcher/inmem/minibatch_store.go index c11e7b2406..6a33d70076 100644 --- a/disperser/batcher/inmem/minibatch_store.go +++ b/disperser/batcher/inmem/minibatch_store.go @@ -1,8 +1,11 @@ package inmem import ( + "context" "fmt" + "sync" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/disperser/batcher" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/google/uuid" @@ -14,10 +17,11 @@ type minibatchStore struct { // MinibatchRecords maps batch IDs to a map from minibatch indices to minibatch records MinibatchRecords map[uuid.UUID]map[uint]*batcher.MinibatchRecord // DispersalRequests maps batch IDs to a map from minibatch indices to dispersal requests - DispersalRequests map[uuid.UUID]map[uint]*batcher.DispersalRequest + DispersalRequests map[uuid.UUID]map[uint][]*batcher.DispersalRequest // DispersalResponses maps batch IDs to a map from minibatch indices to dispersal responses - DispersalResponses map[uuid.UUID]map[uint]*batcher.DispersalResponse + DispersalResponses map[uuid.UUID]map[uint][]*batcher.DispersalResponse + mu sync.RWMutex logger logging.Logger } @@ -27,20 +31,26 @@ func NewMinibatchStore(logger logging.Logger) batcher.MinibatchStore { return &minibatchStore{ BatchRecords: make(map[uuid.UUID]*batcher.BatchRecord), MinibatchRecords: make(map[uuid.UUID]map[uint]*batcher.MinibatchRecord), - DispersalRequests: make(map[uuid.UUID]map[uint]*batcher.DispersalRequest), - DispersalResponses: make(map[uuid.UUID]map[uint]*batcher.DispersalResponse), + DispersalRequests: make(map[uuid.UUID]map[uint][]*batcher.DispersalRequest), + DispersalResponses: make(map[uuid.UUID]map[uint][]*batcher.DispersalResponse), logger: logger, } } -func (m *minibatchStore) PutBatch(batch *batcher.BatchRecord) error { +func (m *minibatchStore) PutBatch(ctx context.Context, batch *batcher.BatchRecord) error { + m.mu.Lock() + defer m.mu.Unlock() + m.BatchRecords[batch.ID] = batch return nil } -func (m *minibatchStore) GetBatch(batchID uuid.UUID) (*batcher.BatchRecord, error) { +func (m *minibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batcher.BatchRecord, error) { + m.mu.RLock() + defer m.mu.RUnlock() + b, ok := m.BatchRecords[batchID] if !ok { return nil, fmt.Errorf("batch not found") @@ -48,7 +58,10 @@ func (m *minibatchStore) GetBatch(batchID uuid.UUID) (*batcher.BatchRecord, erro return b, nil } -func (m *minibatchStore) PutMiniBatch(minibatch *batcher.MinibatchRecord) error { +func (m *minibatchStore) PutMinibatch(ctx context.Context, minibatch *batcher.MinibatchRecord) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.MinibatchRecords[minibatch.BatchID]; !ok { m.MinibatchRecords[minibatch.BatchID] = make(map[uint]*batcher.MinibatchRecord) } @@ -57,23 +70,61 @@ func (m *minibatchStore) PutMiniBatch(minibatch *batcher.MinibatchRecord) error return nil } -func (m *minibatchStore) GetMiniBatch(batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) { +func (m *minibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) { + m.mu.RLock() + defer m.mu.RUnlock() + if _, ok := m.MinibatchRecords[batchID]; !ok { return nil, nil } return m.MinibatchRecords[batchID][minibatchIndex], nil } -func (m *minibatchStore) PutDispersalRequest(request *batcher.DispersalRequest) error { +func (m *minibatchStore) PutDispersalRequest(ctx context.Context, request *batcher.DispersalRequest) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.DispersalRequests[request.BatchID]; !ok { - m.DispersalRequests[request.BatchID] = make(map[uint]*batcher.DispersalRequest) + m.DispersalRequests[request.BatchID] = make(map[uint][]*batcher.DispersalRequest) + } + + if _, ok := m.DispersalRequests[request.BatchID][request.MinibatchIndex]; !ok { + m.DispersalRequests[request.BatchID][request.MinibatchIndex] = make([]*batcher.DispersalRequest, 0) + } + + for _, r := range m.DispersalRequests[request.BatchID][request.MinibatchIndex] { + if r.OperatorID == request.OperatorID { + // replace existing record + *r = *request + return nil + } } - m.DispersalRequests[request.BatchID][request.MinibatchIndex] = request + + m.DispersalRequests[request.BatchID][request.MinibatchIndex] = append(m.DispersalRequests[request.BatchID][request.MinibatchIndex], request) return nil } -func (m *minibatchStore) GetDispersalRequest(batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalRequest, error) { +func (m *minibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*batcher.DispersalRequest, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + requests, err := m.GetDispersalRequests(ctx, batchID, minibatchIndex) + if err != nil { + return nil, err + } + for _, r := range requests { + if r.OperatorID == opID { + return r, nil + } + } + return nil, nil +} + +func (m *minibatchStore) GetDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalRequest, error) { + m.mu.RLock() + defer m.mu.RUnlock() + if _, ok := m.DispersalRequests[batchID]; !ok { return nil, nil } @@ -81,16 +132,51 @@ func (m *minibatchStore) GetDispersalRequest(batchID uuid.UUID, minibatchIndex u return m.DispersalRequests[batchID][minibatchIndex], nil } -func (m *minibatchStore) PutDispersalResponse(response *batcher.DispersalResponse) error { +func (m *minibatchStore) PutDispersalResponse(ctx context.Context, response *batcher.DispersalResponse) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.DispersalResponses[response.BatchID]; !ok { - m.DispersalResponses[response.BatchID] = make(map[uint]*batcher.DispersalResponse) + m.DispersalResponses[response.BatchID] = make(map[uint][]*batcher.DispersalResponse) + } + + if _, ok := m.DispersalResponses[response.BatchID][response.MinibatchIndex]; !ok { + m.DispersalResponses[response.BatchID][response.MinibatchIndex] = make([]*batcher.DispersalResponse, 0) } - m.DispersalResponses[response.BatchID][response.MinibatchIndex] = response + + for _, r := range m.DispersalResponses[response.BatchID][response.MinibatchIndex] { + if r.OperatorID == response.OperatorID { + // replace existing record + *r = *response + return nil + } + } + + m.DispersalResponses[response.BatchID][response.MinibatchIndex] = append(m.DispersalResponses[response.BatchID][response.MinibatchIndex], response) return nil } -func (m *minibatchStore) GetDispersalResponse(batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalResponse, error) { +func (m *minibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*batcher.DispersalResponse, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + responses, err := m.GetDispersalResponses(ctx, batchID, minibatchIndex) + if err != nil { + return nil, err + } + for _, r := range responses { + if r.OperatorID == opID { + return r, nil + } + } + return nil, nil +} + +func (m *minibatchStore) GetDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalResponse, error) { + m.mu.RLock() + defer m.mu.RUnlock() + if _, ok := m.DispersalResponses[batchID]; !ok { return nil, nil } @@ -98,6 +184,9 @@ func (m *minibatchStore) GetDispersalResponse(batchID uuid.UUID, minibatchIndex return m.DispersalResponses[batchID][minibatchIndex], nil } -func (m *minibatchStore) GetPendingBatch() (*batcher.BatchRecord, error) { +func (m *minibatchStore) GetPendingBatch(ctx context.Context) (*batcher.BatchRecord, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return nil, nil } diff --git a/disperser/batcher/inmem/minibatch_store_test.go b/disperser/batcher/inmem/minibatch_store_test.go index dbe0a01054..a6693c1118 100644 --- a/disperser/batcher/inmem/minibatch_store_test.go +++ b/disperser/batcher/inmem/minibatch_store_test.go @@ -1,6 +1,7 @@ package inmem_test import ( + "context" "testing" "time" @@ -29,9 +30,10 @@ func TestPutBatch(t *testing.T) { AggregatePubKey: nil, AggregateSignature: nil, } - err = s.PutBatch(batch) + ctx := context.Background() + err = s.PutBatch(ctx, batch) assert.NoError(t, err) - b, err := s.GetBatch(batch.ID) + b, err := s.GetBatch(ctx, batch.ID) assert.NoError(t, err) assert.Equal(t, batch, b) } @@ -47,9 +49,10 @@ func TestPutMiniBatch(t *testing.T) { BatchSize: 1, ReferenceBlockNumber: 1, } - err = s.PutMiniBatch(minibatch) + ctx := context.Background() + err = s.PutMinibatch(ctx, minibatch) assert.NoError(t, err) - m, err := s.GetMiniBatch(minibatch.BatchID, minibatch.MinibatchIndex) + m, err := s.GetMinibatch(ctx, minibatch.BatchID, minibatch.MinibatchIndex) assert.NoError(t, err) assert.Equal(t, minibatch, m) } @@ -58,29 +61,54 @@ func TestPutDispersalRequest(t *testing.T) { s := newMinibatchStore() id, err := uuid.NewV7() assert.NoError(t, err) - request := &batcher.DispersalRequest{ + minibatchIndex := uint(0) + ctx := context.Background() + req1 := &batcher.DispersalRequest{ BatchID: id, - MinibatchIndex: 0, + MinibatchIndex: minibatchIndex, OperatorID: core.OperatorID([32]byte{1}), OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, RequestedAt: time.Now().UTC(), } - err = s.PutDispersalRequest(request) + err = s.PutDispersalRequest(ctx, req1) assert.NoError(t, err) - r, err := s.GetDispersalRequest(request.BatchID, request.MinibatchIndex) + req2 := &batcher.DispersalRequest{ + BatchID: id, + MinibatchIndex: minibatchIndex, + OperatorID: core.OperatorID([32]byte{2}), + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: time.Now().UTC(), + } + err = s.PutDispersalRequest(ctx, req2) + assert.NoError(t, err) + + r, err := s.GetDispersalRequests(ctx, id, minibatchIndex) assert.NoError(t, err) - assert.Equal(t, request, r) + assert.Len(t, r, 2) + assert.Equal(t, req1, r[0]) + assert.Equal(t, req2, r[1]) + + req, err := s.GetDispersalRequest(ctx, id, minibatchIndex, req1.OperatorID) + assert.NoError(t, err) + assert.Equal(t, req1, req) + + req, err = s.GetDispersalRequest(ctx, id, minibatchIndex, req2.OperatorID) + assert.NoError(t, err) + assert.Equal(t, req2, req) } func TestPutDispersalResponse(t *testing.T) { s := newMinibatchStore() id, err := uuid.NewV7() assert.NoError(t, err) - response := &batcher.DispersalResponse{ + ctx := context.Background() + minibatchIndex := uint(0) + resp1 := &batcher.DispersalResponse{ DispersalRequest: batcher.DispersalRequest{ BatchID: id, - MinibatchIndex: 0, + MinibatchIndex: minibatchIndex, OperatorID: core.OperatorID([32]byte{1}), OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, @@ -90,9 +118,33 @@ func TestPutDispersalResponse(t *testing.T) { RespondedAt: time.Now().UTC(), Error: nil, } - err = s.PutDispersalResponse(response) + resp2 := &batcher.DispersalResponse{ + DispersalRequest: batcher.DispersalRequest{ + BatchID: id, + MinibatchIndex: minibatchIndex, + OperatorID: core.OperatorID([32]byte{2}), + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: time.Now().UTC(), + }, + Signatures: nil, + RespondedAt: time.Now().UTC(), + Error: nil, + } + err = s.PutDispersalResponse(ctx, resp1) assert.NoError(t, err) - r, err := s.GetDispersalResponse(response.BatchID, response.MinibatchIndex) + err = s.PutDispersalResponse(ctx, resp2) + assert.NoError(t, err) + + r, err := s.GetDispersalResponses(ctx, id, minibatchIndex) + assert.NoError(t, err) + assert.Len(t, r, 2) + + resp, err := s.GetDispersalResponse(ctx, id, minibatchIndex, resp1.OperatorID) + assert.NoError(t, err) + assert.Equal(t, resp1, resp) + + resp, err = s.GetDispersalResponse(ctx, id, minibatchIndex, resp2.OperatorID) assert.NoError(t, err) - assert.Equal(t, response, r) + assert.Equal(t, resp2, resp) } diff --git a/disperser/batcher/minibatch_store.go b/disperser/batcher/minibatch_store.go index a3b45e20f7..1409b7ca07 100644 --- a/disperser/batcher/minibatch_store.go +++ b/disperser/batcher/minibatch_store.go @@ -1,6 +1,7 @@ package batcher import ( + "context" "time" "github.com/Layr-Labs/eigenda/core" @@ -30,6 +31,7 @@ type DispersalRequest struct { MinibatchIndex uint core.OperatorID OperatorAddress gcommon.Address + Socket string NumBlobs uint RequestedAt time.Time } @@ -42,13 +44,15 @@ type DispersalResponse struct { } type MinibatchStore interface { - PutBatch(batch *BatchRecord) error - GetBatch(batchID uuid.UUID) (*BatchRecord, error) - PutMiniBatch(minibatch *MinibatchRecord) error - GetMiniBatch(batchID uuid.UUID, minibatchIndex uint) (*MinibatchRecord, error) - PutDispersalRequest(request *DispersalRequest) error - GetDispersalRequest(batchID uuid.UUID, minibatchIndex uint) (*DispersalRequest, error) - PutDispersalResponse(response *DispersalResponse) error - GetDispersalResponse(batchID uuid.UUID, minibatchIndex uint) (*DispersalResponse, error) - GetPendingBatch() (*BatchRecord, error) + PutBatch(ctx context.Context, batch *BatchRecord) error + GetBatch(ctx context.Context, batchID uuid.UUID) (*BatchRecord, error) + PutMinibatch(ctx context.Context, minibatch *MinibatchRecord) error + GetMinibatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*MinibatchRecord, error) + PutDispersalRequest(ctx context.Context, request *DispersalRequest) error + GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*DispersalRequest, error) + GetDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*DispersalRequest, error) + PutDispersalResponse(ctx context.Context, response *DispersalResponse) error + GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*DispersalResponse, error) + GetDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*DispersalResponse, error) + GetPendingBatch(ctx context.Context) (*BatchRecord, error) } diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go new file mode 100644 index 0000000000..a00d054199 --- /dev/null +++ b/disperser/batcher/minibatcher.go @@ -0,0 +1,296 @@ +package batcher + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/google/uuid" + "github.com/hashicorp/go-multierror" +) + +type MinibatcherConfig struct { + PullInterval time.Duration + MaxNumConnections uint + MaxNumRetriesPerBlob uint + MaxNumRetriesPerDispersal uint +} + +type Minibatcher struct { + MinibatcherConfig + + BlobStore disperser.BlobStore + MinibatchStore MinibatchStore + Dispatcher disperser.Dispatcher + ChainState core.IndexedChainState + AssignmentCoordinator core.AssignmentCoordinator + EncodingStreamer *EncodingStreamer + Pool common.WorkerPool + + // local state + ReferenceBlockNumber uint + BatchID uuid.UUID + MinibatchIndex uint + + ethClient common.EthClient + logger logging.Logger +} + +func NewMinibatcher( + config MinibatcherConfig, + blobStore disperser.BlobStore, + minibatchStore MinibatchStore, + dispatcher disperser.Dispatcher, + chainState core.IndexedChainState, + assignmentCoordinator core.AssignmentCoordinator, + // aggregator core.SignatureAggregator, + encodingStreamer *EncodingStreamer, + ethClient common.EthClient, + workerpool common.WorkerPool, + logger logging.Logger, +) (*Minibatcher, error) { + return &Minibatcher{ + MinibatcherConfig: config, + BlobStore: blobStore, + MinibatchStore: minibatchStore, + Dispatcher: dispatcher, + ChainState: chainState, + AssignmentCoordinator: assignmentCoordinator, + EncodingStreamer: encodingStreamer, + Pool: workerpool, + ReferenceBlockNumber: 0, + MinibatchIndex: 0, + + ethClient: ethClient, + logger: logger.With("component", "Minibatcher"), + }, nil +} + +func (b *Minibatcher) Start(ctx context.Context) error { + err := b.ChainState.Start(ctx) + if err != nil { + return err + } + // Wait for few seconds for indexer to index blockchain + // This won't be needed when we switch to using Graph node + time.Sleep(indexerWarmupDelay) + go func() { + ticker := time.NewTicker(b.PullInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := b.HandleSingleBatch(ctx); err != nil { + if errors.Is(err, errNoEncodedResults) { + b.logger.Warn("no encoded results to make a batch with") + } else { + b.logger.Error("failed to process a batch", "err", err) + } + } + } + } + }() + + return nil +} + +func (b *Minibatcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error { + var result *multierror.Error + numPermanentFailures := 0 + for _, metadata := range blobMetadatas { + b.EncodingStreamer.RemoveEncodedBlob(metadata) + retry, err := b.BlobStore.HandleBlobFailure(ctx, metadata, b.MaxNumRetriesPerBlob) + if err != nil { + b.logger.Error("HandleSingleBatch: error handling blob failure", "err", err) + // Append the error + result = multierror.Append(result, err) + } + + if retry { + continue + } + numPermanentFailures++ + } + + // Return the error(s) + return result.ErrorOrNil() +} + +func (b *Minibatcher) HandleSingleBatch(ctx context.Context) error { + log := b.logger + // If too many dispersal requests are pending, skip an iteration + if pending := b.Pool.WaitingQueueSize(); pending > int(b.MaxNumConnections) { + return fmt.Errorf("too many pending requests %d with max number of connections %d. skipping minibatch iteration", pending, b.MaxNumConnections) + } + stageTimer := time.Now() + // All blobs in this batch are marked as DISPERSING + batch, err := b.EncodingStreamer.CreateMinibatch(ctx) + if err != nil { + return err + } + log.Debug("CreateMinibatch took", "duration", time.Since(stageTimer).String()) + + if b.ReferenceBlockNumber < batch.BatchHeader.ReferenceBlockNumber { + b.BatchID, err = uuid.NewV7() + if err != nil { + _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error generating batch UUID")) + return fmt.Errorf("error generating batch ID: %w", err) + } + batchHeaderHash, err := batch.BatchHeader.GetBatchHeaderHash() + if err != nil { + _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error getting batch header hash")) + return fmt.Errorf("error getting batch header hash: %w", err) + } + b.MinibatchIndex = 0 + b.ReferenceBlockNumber = batch.BatchHeader.ReferenceBlockNumber + err = b.MinibatchStore.PutBatch(ctx, &BatchRecord{ + ID: b.BatchID, + CreatedAt: time.Now().UTC(), + ReferenceBlockNumber: b.ReferenceBlockNumber, + HeaderHash: batchHeaderHash, + }) + if err != nil { + _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error storing batch record")) + return fmt.Errorf("error storing batch record: %w", err) + } + } + + // Store minibatch record + blobHeaderHashes := make([][32]byte, 0, len(batch.EncodedBlobs)) + batchSize := int64(0) + for _, blob := range batch.EncodedBlobs { + h, err := blob.BlobHeader.GetBlobHeaderHash() + if err != nil { + _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error getting blob header hash")) + return fmt.Errorf("error getting blob header hash: %w", err) + } + blobHeaderHashes = append(blobHeaderHashes, h) + batchSize += blob.BlobHeader.EncodedSizeAllQuorums() + } + err = b.MinibatchStore.PutMinibatch(ctx, &MinibatchRecord{ + BatchID: b.BatchID, + MinibatchIndex: b.MinibatchIndex, + BlobHeaderHashes: blobHeaderHashes, + BatchSize: uint64(batchSize), + ReferenceBlockNumber: b.ReferenceBlockNumber, + }) + if err != nil { + _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error storing minibatch record")) + return fmt.Errorf("error storing minibatch record: %w", err) + } + + // Dispatch encoded batch + log.Debug("Dispatching encoded batch...", "batchID", b.BatchID, "minibatchIndex", b.MinibatchIndex, "referenceBlockNumber", b.ReferenceBlockNumber, "numBlobs", len(batch.EncodedBlobs)) + stageTimer = time.Now() + b.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, b.BatchID, b.MinibatchIndex) + log.Debug("DisperseBatch took", "duration", time.Since(stageTimer).String()) + + h, err := batch.State.OperatorState.Hash() + if err != nil { + log.Error("error getting operator state hash", "err", err) + } + hStr := make([]string, 0, len(h)) + for q, hash := range h { + hStr = append(hStr, fmt.Sprintf("%d: %x", q, hash)) + } + log.Info("Successfully dispatched minibatch", "operatorStateHash", hStr) + + b.MinibatchIndex++ + + return nil +} + +func (b *Minibatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, batchID uuid.UUID, minibatchIndex uint) { + for id, op := range state.IndexedOperators { + opInfo := op + opID := id + b.Pool.Submit(func() { + req := &DispersalRequest{ + BatchID: batchID, + MinibatchIndex: minibatchIndex, + OperatorID: opID, + Socket: op.Socket, + NumBlobs: uint(len(blobs)), + RequestedAt: time.Now().UTC(), + } + err := b.MinibatchStore.PutDispersalRequest(ctx, req) + if err != nil { + b.logger.Error("failed to put dispersal request", "err", err) + return + } + signatures, err := b.SendBlobsToOperatorWithRetries(ctx, blobs, batchHeader, opInfo, opID, int(b.MaxNumRetriesPerDispersal)) + + // Update the minibatch state + err = b.MinibatchStore.PutDispersalResponse(ctx, &DispersalResponse{ + DispersalRequest: *req, + Signatures: signatures, + RespondedAt: time.Now().UTC(), + Error: err, + }) + if err != nil { + b.logger.Error("failed to put dispersal response", "err", err) + } + }) + } +} + +func (b *Minibatcher) SendBlobsToOperatorWithRetries( + ctx context.Context, + blobs []core.EncodedBlob, + batchHeader *core.BatchHeader, + op *core.IndexedOperatorInfo, + opID core.OperatorID, + maxNumRetries int, +) ([]*core.Signature, error) { + blobMessages := make([]*core.BlobMessage, 0) + hasAnyBundles := false + for _, blob := range blobs { + if _, ok := blob.BundlesByOperator[opID]; ok { + hasAnyBundles = true + } + blobMessages = append(blobMessages, &core.BlobMessage{ + BlobHeader: blob.BlobHeader, + // Bundles will be empty if the operator is not in the quorums blob is dispersed on + Bundles: blob.BundlesByOperator[opID], + }) + } + if !hasAnyBundles { + // Operator is not part of any quorum, no need to send chunks + return nil, fmt.Errorf("operator %s is not part of any quorum", opID.Hex()) + } + + numRetries := 0 + // initially set the timeout equal to the pull interval with exponential backoff + timeout := b.PullInterval + var signatures []*core.Signature + var err error + for numRetries < maxNumRetries { + requestedAt := time.Now() + ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + signatures, err = b.Dispatcher.SendBlobsToOperator(ctxWithTimeout, blobMessages, batchHeader, op) + latencyMs := float64(time.Since(requestedAt).Milliseconds()) + if err != nil { + b.logger.Error("error sending chunks to operator", "operator", opID.Hex(), "err", err, "timeout", timeout.String(), "numRetries", numRetries, "maxNumRetries", maxNumRetries) + numRetries++ + timeout *= 2 + continue + } + b.logger.Info("sent chunks to operator", "operator", opID.Hex(), "latencyMs", latencyMs) + break + } + + if signatures == nil && err != nil { + return nil, fmt.Errorf("failed to send chunks to operator %s: %w", opID.Hex(), err) + } + + return signatures, nil +} diff --git a/disperser/batcher/minibatcher_test.go b/disperser/batcher/minibatcher_test.go new file mode 100644 index 0000000000..0059d3815a --- /dev/null +++ b/disperser/batcher/minibatcher_test.go @@ -0,0 +1,357 @@ +package batcher_test + +import ( + "context" + "errors" + "math/big" + "testing" + "time" + + cmock "github.com/Layr-Labs/eigenda/common/mock" + commonmock "github.com/Layr-Labs/eigenda/common/mock" + "github.com/Layr-Labs/eigenda/core" + coremock "github.com/Layr-Labs/eigenda/core/mock" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/disperser/batcher" + "github.com/Layr-Labs/eigenda/disperser/batcher/inmem" + dinmem "github.com/Layr-Labs/eigenda/disperser/common/inmem" + dmock "github.com/Layr-Labs/eigenda/disperser/mock" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/consensys/gnark-crypto/ecc/bn254" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/gammazero/workerpool" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +var ( + opId0, _ = core.OperatorIDFromHex("e22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311") + opId1, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568312") + mockChainState, _ = coremock.NewChainDataMock(map[uint8]map[core.OperatorID]int{ + 0: { + opId0: 1, + opId1: 1, + }, + 1: { + opId0: 1, + }, + }) + defaultConfig = batcher.MinibatcherConfig{ + PullInterval: 1 * time.Second, + MaxNumConnections: 3, + MaxNumRetriesPerDispersal: 3, + } + initialBlock = uint(10) +) + +type minibatcherComponents struct { + minibatcher *batcher.Minibatcher + blobStore disperser.BlobStore + minibatchStore batcher.MinibatchStore + dispatcher *dmock.Dispatcher + chainState *coremock.MockIndexedChainState + assignmentCoordinator core.AssignmentCoordinator + encodingStreamer *batcher.EncodingStreamer + pool *workerpool.WorkerPool + ethClient *commonmock.MockEthClient + logger logging.Logger +} + +func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcherComponents { + logger := logging.NewNoopLogger() + blobStore := dinmem.NewBlobStore() + minibatchStore := inmem.NewMinibatchStore(logger) + chainState, err := coremock.NewChainDataMock(mockChainState.Stakes) + assert.NoError(t, err) + state := chainState.GetTotalOperatorState(context.Background(), 0) + dispatcher := dmock.NewDispatcher(state) + ics := &coremock.MockIndexedChainState{} + streamerConfig := batcher.StreamerConfig{ + SRSOrder: 3000, + EncodingRequestTimeout: 5 * time.Second, + EncodingQueueLimit: 10, + TargetNumChunks: 8092, + MaxBlobsToFetchFromStore: 10, + FinalizationBlockDelay: 0, + ChainStateTimeout: 5 * time.Second, + } + encodingWorkerPool := workerpool.New(10) + p, err := makeTestProver() + assert.NoError(t, err) + encoderClient := disperser.NewLocalEncoderClient(p) + asgn := &core.StdAssignmentCoordinator{} + chainState.On("GetCurrentBlockNumber").Return(initialBlock, nil) + metrics := batcher.NewMetrics("9100", logger) + trigger := batcher.NewEncodedSizeNotifier( + make(chan struct{}, 1), + 10*1024*1024, + ) + encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, chainState, encoderClient, asgn, trigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger) + assert.NoError(t, err) + ethClient := &cmock.MockEthClient{} + pool := workerpool.New(int(config.MaxNumConnections)) + m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, chainState, asgn, encodingStreamer, ethClient, pool, logger) + assert.NoError(t, err) + return &minibatcherComponents{ + minibatcher: m, + blobStore: blobStore, + minibatchStore: minibatchStore, + dispatcher: dispatcher, + chainState: ics, + assignmentCoordinator: asgn, + encodingStreamer: encodingStreamer, + pool: pool, + ethClient: ethClient, + logger: logger, + } +} + +func TestDisperseMinibatch(t *testing.T) { + c := newMinibatcher(t, defaultConfig) + var X, Y fp.Element + X = *X.SetBigInt(big.NewInt(1)) + Y = *Y.SetBigInt(big.NewInt(2)) + c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*core.Signature{ + { + G1Point: &core.G1Point{ + G1Affine: &bn254.G1Affine{ + X: X, + Y: Y, + }, + }, + }, + }, nil) + ctx := context.Background() + + blob1 := makeTestBlob([]*core.SecurityParam{{ + QuorumID: 0, + AdversaryThreshold: 80, + ConfirmationThreshold: 100, + }}) + blob2 := makeTestBlob([]*core.SecurityParam{{ + QuorumID: 1, + AdversaryThreshold: 70, + ConfirmationThreshold: 100, + }}) + _, _ = queueBlob(t, ctx, &blob1, c.blobStore) + _, _ = queueBlob(t, ctx, &blob2, c.blobStore) + + // Start the batcher + out := make(chan batcher.EncodingResultOrStatus) + err := c.encodingStreamer.RequestEncoding(ctx, out) + assert.NoError(t, err) + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + assert.NoError(t, err) + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + assert.NoError(t, err) + count, _ := c.encodingStreamer.EncodedBlobstore.GetEncodedResultSize() + assert.Equal(t, 2, count) + + err = c.minibatcher.HandleSingleBatch(ctx) + assert.NoError(t, err) + assert.NotNil(t, c.minibatcher.BatchID) + assert.Equal(t, c.minibatcher.MinibatchIndex, uint(1)) + assert.Equal(t, c.minibatcher.ReferenceBlockNumber, initialBlock) + + b, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.BatchID) + assert.NoError(t, err) + assert.NotNil(t, b) + assert.Equal(t, c.minibatcher.BatchID, b.ID) + assert.NotNil(t, b.HeaderHash) + assert.NotNil(t, b.CreatedAt) + assert.Equal(t, c.minibatcher.ReferenceBlockNumber, b.ReferenceBlockNumber) + mb, err := c.minibatchStore.GetMinibatch(ctx, c.minibatcher.BatchID, 0) + assert.NoError(t, err) + assert.NotNil(t, mb) + assert.Equal(t, c.minibatcher.BatchID, mb.BatchID) + assert.Equal(t, uint(0), mb.MinibatchIndex) + assert.Len(t, mb.BlobHeaderHashes, 2) + assert.Equal(t, uint64(12800), mb.BatchSize) + assert.Equal(t, c.minibatcher.ReferenceBlockNumber, mb.ReferenceBlockNumber) + + c.pool.StopWait() + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2) + dispersalRequests, err := c.minibatchStore.GetDispersalRequests(ctx, c.minibatcher.BatchID, 0) + assert.NoError(t, err) + assert.Len(t, dispersalRequests, 2) + opIDs := make([]core.OperatorID, 2) + for i, req := range dispersalRequests { + assert.Equal(t, req.BatchID, c.minibatcher.BatchID) + assert.Equal(t, req.MinibatchIndex, uint(0)) + assert.Equal(t, req.NumBlobs, uint(2)) + assert.NotNil(t, req.Socket) + assert.NotNil(t, req.RequestedAt) + opIDs[i] = req.OperatorID + } + assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1}) + + dispersalResponses, err := c.minibatchStore.GetDispersalResponses(ctx, c.minibatcher.BatchID, 0) + assert.NoError(t, err) + assert.Len(t, dispersalResponses, 2) + for _, resp := range dispersalResponses { + assert.Equal(t, resp.BatchID, c.minibatcher.BatchID) + assert.Equal(t, resp.MinibatchIndex, uint(0)) + assert.NotNil(t, resp.RespondedAt) + assert.NoError(t, resp.Error) + assert.Len(t, resp.Signatures, 1) + } +} + +func TestDisperseMinibatchFailure(t *testing.T) { + c := newMinibatcher(t, defaultConfig) + var X, Y fp.Element + X = *X.SetBigInt(big.NewInt(1)) + Y = *Y.SetBigInt(big.NewInt(2)) + c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*core.Signature{ + { + G1Point: &core.G1Point{ + G1Affine: &bn254.G1Affine{ + X: X, + Y: Y, + }, + }, + }, + }, nil) + ctx := context.Background() + + blob1 := makeTestBlob([]*core.SecurityParam{{ + QuorumID: 0, + AdversaryThreshold: 80, + ConfirmationThreshold: 100, + }}) + blob2 := makeTestBlob([]*core.SecurityParam{{ + QuorumID: 1, + AdversaryThreshold: 70, + ConfirmationThreshold: 100, + }}) + _, _ = queueBlob(t, ctx, &blob1, c.blobStore) + _, _ = queueBlob(t, ctx, &blob2, c.blobStore) + + // Start the batcher + out := make(chan batcher.EncodingResultOrStatus) + err := c.encodingStreamer.RequestEncoding(ctx, out) + assert.NoError(t, err) + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + assert.NoError(t, err) + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + assert.NoError(t, err) + count, _ := c.encodingStreamer.EncodedBlobstore.GetEncodedResultSize() + assert.Equal(t, 2, count) + + err = c.minibatcher.HandleSingleBatch(ctx) + assert.NoError(t, err) + assert.NotNil(t, c.minibatcher.BatchID) + assert.Equal(t, c.minibatcher.MinibatchIndex, uint(1)) + assert.Equal(t, c.minibatcher.ReferenceBlockNumber, initialBlock) + + b, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.BatchID) + assert.NoError(t, err) + assert.NotNil(t, b) + assert.Equal(t, c.minibatcher.BatchID, b.ID) + assert.NotNil(t, b.HeaderHash) + assert.NotNil(t, b.CreatedAt) + assert.Equal(t, c.minibatcher.ReferenceBlockNumber, b.ReferenceBlockNumber) + mb, err := c.minibatchStore.GetMinibatch(ctx, c.minibatcher.BatchID, 0) + assert.NoError(t, err) + assert.NotNil(t, mb) + assert.Equal(t, c.minibatcher.BatchID, mb.BatchID) + assert.Equal(t, uint(0), mb.MinibatchIndex) + assert.Len(t, mb.BlobHeaderHashes, 2) + assert.Equal(t, uint64(12800), mb.BatchSize) + assert.Equal(t, c.minibatcher.ReferenceBlockNumber, mb.ReferenceBlockNumber) + + c.pool.StopWait() + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2) + dispersalRequests, err := c.minibatchStore.GetDispersalRequests(ctx, c.minibatcher.BatchID, 0) + assert.NoError(t, err) + assert.Len(t, dispersalRequests, 2) + opIDs := make([]core.OperatorID, 2) + for i, req := range dispersalRequests { + assert.Equal(t, req.BatchID, c.minibatcher.BatchID) + assert.Equal(t, req.MinibatchIndex, uint(0)) + assert.Equal(t, req.NumBlobs, uint(2)) + assert.NotNil(t, req.Socket) + assert.NotNil(t, req.RequestedAt) + opIDs[i] = req.OperatorID + } + assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1}) + + dispersalResponses, err := c.minibatchStore.GetDispersalResponses(ctx, c.minibatcher.BatchID, 0) + assert.NoError(t, err) + assert.Len(t, dispersalResponses, 2) + for _, resp := range dispersalResponses { + assert.Equal(t, resp.BatchID, c.minibatcher.BatchID) + assert.Equal(t, resp.MinibatchIndex, uint(0)) + assert.NotNil(t, resp.RespondedAt) + assert.NoError(t, resp.Error) + assert.Len(t, resp.Signatures, 1) + } +} + +func TestSendBlobsToOperatorWithRetries(t *testing.T) { + c := newMinibatcher(t, defaultConfig) + var X, Y fp.Element + X = *X.SetBigInt(big.NewInt(1)) + Y = *Y.SetBigInt(big.NewInt(2)) + sig := &core.Signature{ + G1Point: &core.G1Point{ + G1Affine: &bn254.G1Affine{ + X: X, + Y: Y, + }, + }, + } + ctx := context.Background() + + blob1 := makeTestBlob([]*core.SecurityParam{{ + QuorumID: 0, + AdversaryThreshold: 80, + ConfirmationThreshold: 100, + }}) + blob2 := makeTestBlob([]*core.SecurityParam{{ + QuorumID: 1, + AdversaryThreshold: 70, + ConfirmationThreshold: 100, + }}) + _, _ = queueBlob(t, ctx, &blob1, c.blobStore) + _, _ = queueBlob(t, ctx, &blob2, c.blobStore) + + // Start the batcher + out := make(chan batcher.EncodingResultOrStatus) + err := c.encodingStreamer.RequestEncoding(ctx, out) + assert.NoError(t, err) + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + assert.NoError(t, err) + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + assert.NoError(t, err) + count, _ := c.encodingStreamer.EncodedBlobstore.GetEncodedResultSize() + assert.Equal(t, 2, count) + batch, err := c.encodingStreamer.CreateMinibatch(ctx) + assert.NoError(t, err) + + c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fail")).Twice() + c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*core.Signature{sig}, nil).Once() + signatures, err := c.minibatcher.SendBlobsToOperatorWithRetries(ctx, batch.EncodedBlobs, batch.BatchHeader, batch.State.IndexedOperators[opId0], opId0, 3) + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 3) + assert.NoError(t, err) + assert.Len(t, signatures, 1) + + c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fail")).Times(3) + c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*core.Signature{sig}, nil).Once() + signatures, err = c.minibatcher.SendBlobsToOperatorWithRetries(ctx, batch.EncodedBlobs, batch.BatchHeader, batch.State.IndexedOperators[opId1], opId1, 3) + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 6) + assert.ErrorContains(t, err, "failed to send chunks to operator") + assert.Nil(t, signatures) +} + +func TestMinibatcherTooManyPendingRequests(t *testing.T) { + c := newMinibatcher(t, defaultConfig) + ctx := context.Background() + mockWorkerPool := &cmock.MockWorkerpool{} + // minibatcher with mock worker pool + m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.minibatcher.ChainState, c.assignmentCoordinator, c.encodingStreamer, c.ethClient, mockWorkerPool, c.logger) + assert.NoError(t, err) + mockWorkerPool.On("WaitingQueueSize").Return(int(defaultConfig.MaxNumConnections + 1)).Once() + err = m.HandleSingleBatch(ctx) + assert.ErrorContains(t, err, "too many pending requests") +} diff --git a/disperser/cmd/batcher/flags/flags.go b/disperser/cmd/batcher/flags/flags.go index 6dddb76cb0..3b0cadbd5f 100644 --- a/disperser/cmd/batcher/flags/flags.go +++ b/disperser/cmd/batcher/flags/flags.go @@ -201,6 +201,27 @@ var ( Required: false, EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_MINIBATCH"), } + MinibatcherPullIntervalFlag = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "minibatcher-pull-interval"), + Usage: "Interval at which to pull from the queue and disperse a minibatch. Only used when minibatching is enabled. Defaults to 5s.", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "MINIBATCHER_PULL_INTERVAL"), + Value: 5 * time.Second, + } + MaxNodeConnectionsFlag = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "max-node-connections"), + Usage: "Maximum number of connections to the node. Only used when minibatching is enabled. Defaults to 1024.", + Required: false, + Value: 1024, + EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_NODE_CONNECTIONS"), + } + MaxNumRetriesPerDispersalFlag = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "max-num-retries-per-dispersal"), + Usage: "Maximum number of retries to disperse a minibatch. Only used when minibatching is enabled. Defaults to 3.", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_NUM_RETRIES_PER_DISPERSAL"), + Value: 3, + } ) var requiredFlags = []cli.Flag{ @@ -234,6 +255,9 @@ var optionalFlags = []cli.Flag{ MaxBlobsToFetchFromStoreFlag, FinalizationBlockDelayFlag, EnableMinibatchFlag, + MinibatcherPullIntervalFlag, + MaxNodeConnectionsFlag, + MaxNumRetriesPerDispersalFlag, } // Flags contains the list of configuration options available to the binary.