diff --git a/disperser/batcher/inmem/minibatch_store.go b/disperser/batcher/inmem/minibatch_store.go index 6a33d70076..132d090a67 100644 --- a/disperser/batcher/inmem/minibatch_store.go +++ b/disperser/batcher/inmem/minibatch_store.go @@ -2,7 +2,8 @@ package inmem import ( "context" - "fmt" + "errors" + "sort" "sync" "github.com/Layr-Labs/eigenda/core" @@ -11,6 +12,8 @@ import ( "github.com/google/uuid" ) +var BatchNotFound = errors.New("batch not found") + type minibatchStore struct { // BatchRecords maps batch IDs to batch records BatchRecords map[uuid.UUID]*batcher.BatchRecord @@ -53,11 +56,39 @@ func (m *minibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batc b, ok := m.BatchRecords[batchID] if !ok { - return nil, fmt.Errorf("batch not found") + return nil, BatchNotFound } return b, nil } +func (m *minibatchStore) GetBatchesByStatus(ctx context.Context, status batcher.BatchStatus) ([]*batcher.BatchRecord, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + var batches []*batcher.BatchRecord + for _, b := range m.BatchRecords { + if b.Status == status { + batches = append(batches, b) + } + } + sort.Slice(batches, func(i, j int) bool { + return batches[i].CreatedAt.Before(batches[j].CreatedAt) + }) + return batches, nil +} + +func (m *minibatchStore) UpdateBatchStatus(ctx context.Context, batchID uuid.UUID, status batcher.BatchStatus) error { + m.mu.Lock() + defer m.mu.Unlock() + + b, ok := m.BatchRecords[batchID] + if !ok { + return BatchNotFound + } + b.Status = status + return nil +} + func (m *minibatchStore) PutMinibatch(ctx context.Context, minibatch *batcher.MinibatchRecord) error { m.mu.Lock() defer m.mu.Unlock() @@ -75,11 +106,30 @@ func (m *minibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, mi defer m.mu.RUnlock() if _, ok := m.MinibatchRecords[batchID]; !ok { - return nil, nil + return nil, BatchNotFound } return m.MinibatchRecords[batchID][minibatchIndex], nil } +func (m *minibatchStore) GetMinibatches(ctx context.Context, batchID uuid.UUID) ([]*batcher.MinibatchRecord, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + if _, ok := m.MinibatchRecords[batchID]; !ok { + return nil, nil + } + + res := make([]*batcher.MinibatchRecord, 0, len(m.MinibatchRecords[batchID])) + for _, minibatch := range m.MinibatchRecords[batchID] { + res = append(res, minibatch) + } + sort.Slice(res, func(i, j int) bool { + return res[i].MinibatchIndex < res[j].MinibatchIndex + }) + + return res, nil +} + func (m *minibatchStore) PutDispersalRequest(ctx context.Context, request *batcher.DispersalRequest) error { m.mu.Lock() defer m.mu.Unlock() @@ -109,7 +159,7 @@ func (m *minibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.U m.mu.RLock() defer m.mu.RUnlock() - requests, err := m.GetDispersalRequests(ctx, batchID, minibatchIndex) + requests, err := m.GetMinibatchDispersalRequests(ctx, batchID, minibatchIndex) if err != nil { return nil, err } @@ -121,12 +171,12 @@ func (m *minibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.U return nil, nil } -func (m *minibatchStore) GetDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalRequest, error) { +func (m *minibatchStore) GetMinibatchDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalRequest, error) { m.mu.RLock() defer m.mu.RUnlock() if _, ok := m.DispersalRequests[batchID]; !ok { - return nil, nil + return nil, BatchNotFound } return m.DispersalRequests[batchID][minibatchIndex], nil @@ -161,7 +211,7 @@ func (m *minibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid. m.mu.RLock() defer m.mu.RUnlock() - responses, err := m.GetDispersalResponses(ctx, batchID, minibatchIndex) + responses, err := m.GetMinibatchDispersalResponses(ctx, batchID, minibatchIndex) if err != nil { return nil, err } @@ -173,20 +223,85 @@ func (m *minibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid. return nil, nil } -func (m *minibatchStore) GetDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalResponse, error) { +func (m *minibatchStore) GetMinibatchDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalResponse, error) { m.mu.RLock() defer m.mu.RUnlock() if _, ok := m.DispersalResponses[batchID]; !ok { - return nil, nil + return nil, BatchNotFound } return m.DispersalResponses[batchID][minibatchIndex], nil } -func (m *minibatchStore) GetPendingBatch(ctx context.Context) (*batcher.BatchRecord, error) { +func (m *minibatchStore) GetLatestFormedBatch(ctx context.Context) (batch *batcher.BatchRecord, minibatches []*batcher.MinibatchRecord, err error) { m.mu.RLock() defer m.mu.RUnlock() - return nil, nil + batches, err := m.GetBatchesByStatus(ctx, batcher.BatchStatusFormed) + if err != nil { + return nil, nil, err + } + if len(batches) == 0 { + return nil, nil, nil + } + + batch = batches[0] + minibatches, err = m.GetMinibatches(ctx, batches[0].ID) + if err != nil { + return nil, nil, err + } + + return batch, minibatches, nil +} + +func (m *minibatchStore) getDispersals(ctx context.Context, batchID uuid.UUID) ([]*batcher.DispersalRequest, []*batcher.DispersalResponse, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + if _, ok := m.DispersalRequests[batchID]; !ok { + return nil, nil, BatchNotFound + } + + if _, ok := m.DispersalResponses[batchID]; !ok { + return nil, nil, BatchNotFound + } + + requests := make([]*batcher.DispersalRequest, 0) + for _, reqs := range m.DispersalRequests[batchID] { + requests = append(requests, reqs...) + } + + responses := make([]*batcher.DispersalResponse, 0) + for _, resp := range m.DispersalResponses[batchID] { + responses = append(responses, resp...) + } + + return requests, responses, nil +} + +func (m *minibatchStore) BatchDispersed(ctx context.Context, batchID uuid.UUID) (bool, error) { + dispersed := true + requests, responses, err := m.getDispersals(ctx, batchID) + if err != nil { + return false, err + } + + if len(requests) == 0 || len(responses) == 0 { + return false, nil + } + + if len(requests) != len(responses) { + m.logger.Info("number of minibatch dispersal requests does not match the number of responses", "batchID", batchID, "numRequests", len(requests), "numResponses", len(responses)) + return false, nil + } + + for _, resp := range responses { + if resp.RespondedAt.IsZero() { + dispersed = false + m.logger.Info("response pending", "batchID", batchID, "minibatchIndex", resp.MinibatchIndex, "operatorID", resp.OperatorID.Hex()) + } + } + + return dispersed, nil } diff --git a/disperser/batcher/inmem/minibatch_store_test.go b/disperser/batcher/inmem/minibatch_store_test.go index a6693c1118..b9e2f02f44 100644 --- a/disperser/batcher/inmem/minibatch_store_test.go +++ b/disperser/batcher/inmem/minibatch_store_test.go @@ -84,7 +84,7 @@ func TestPutDispersalRequest(t *testing.T) { err = s.PutDispersalRequest(ctx, req2) assert.NoError(t, err) - r, err := s.GetDispersalRequests(ctx, id, minibatchIndex) + r, err := s.GetMinibatchDispersalRequests(ctx, id, minibatchIndex) assert.NoError(t, err) assert.Len(t, r, 2) assert.Equal(t, req1, r[0]) @@ -136,7 +136,7 @@ func TestPutDispersalResponse(t *testing.T) { err = s.PutDispersalResponse(ctx, resp2) assert.NoError(t, err) - r, err := s.GetDispersalResponses(ctx, id, minibatchIndex) + r, err := s.GetMinibatchDispersalResponses(ctx, id, minibatchIndex) assert.NoError(t, err) assert.Len(t, r, 2) diff --git a/disperser/batcher/minibatch_store.go b/disperser/batcher/minibatch_store.go index 1409b7ca07..d1e3ed24c1 100644 --- a/disperser/batcher/minibatch_store.go +++ b/disperser/batcher/minibatch_store.go @@ -9,10 +9,30 @@ import ( "github.com/google/uuid" ) +type BatchStatus uint + +// Pending: the batch has been created and minibatches are being added. There can be only one pending batch at a time. +// Formed: the batch has been formed and no more minibatches can be added. Implies that all minibatch records and dispersal request records have been created. +// +// Attested: the batch has been attested. +// Failed: the batch has failed. +// +// The batch lifecycle is as follows: +// Pending -> Formed -> Attested +// \ / +// \-> Failed <-/ +const ( + BatchStatusPending BatchStatus = iota + BatchStatusFormed + BatchStatusAttested + BatchStatusFailed +) + type BatchRecord struct { ID uuid.UUID CreatedAt time.Time ReferenceBlockNumber uint + Status BatchStatus HeaderHash [32]byte AggregatePubKey *core.G2Point AggregateSignature *core.Signature @@ -46,13 +66,21 @@ type DispersalResponse struct { type MinibatchStore interface { PutBatch(ctx context.Context, batch *BatchRecord) error GetBatch(ctx context.Context, batchID uuid.UUID) (*BatchRecord, error) + GetBatchesByStatus(ctx context.Context, status BatchStatus) ([]*BatchRecord, error) + UpdateBatchStatus(ctx context.Context, batchID uuid.UUID, status BatchStatus) error PutMinibatch(ctx context.Context, minibatch *MinibatchRecord) error GetMinibatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*MinibatchRecord, error) + GetMinibatches(ctx context.Context, batchID uuid.UUID) ([]*MinibatchRecord, error) PutDispersalRequest(ctx context.Context, request *DispersalRequest) error GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*DispersalRequest, error) - GetDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*DispersalRequest, error) + GetMinibatchDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*DispersalRequest, error) PutDispersalResponse(ctx context.Context, response *DispersalResponse) error GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*DispersalResponse, error) - GetDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*DispersalResponse, error) - GetPendingBatch(ctx context.Context) (*BatchRecord, error) + GetMinibatchDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*DispersalResponse, error) + + // GetLatestFormedBatch returns the latest batch that has been formed. + // If there is no formed batch, it returns nil. + // It also returns the minibatches that belong to the batch in the ascending order of minibatch index. + GetLatestFormedBatch(ctx context.Context) (batch *BatchRecord, minibatches []*MinibatchRecord, err error) + BatchDispersed(ctx context.Context, batchID uuid.UUID) (bool, error) } diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go index a00d054199..c6c83e2a74 100644 --- a/disperser/batcher/minibatcher.go +++ b/disperser/batcher/minibatcher.go @@ -63,8 +63,10 @@ func NewMinibatcher( AssignmentCoordinator: assignmentCoordinator, EncodingStreamer: encodingStreamer, Pool: workerpool, - ReferenceBlockNumber: 0, - MinibatchIndex: 0, + + ReferenceBlockNumber: 0, + BatchID: uuid.Nil, + MinibatchIndex: 0, ethClient: ethClient, logger: logger.With("component", "Minibatcher"), @@ -138,7 +140,18 @@ func (b *Minibatcher) HandleSingleBatch(ctx context.Context) error { } log.Debug("CreateMinibatch took", "duration", time.Since(stageTimer).String()) + // Processing new full batch if b.ReferenceBlockNumber < batch.BatchHeader.ReferenceBlockNumber { + // Update status of the previous batch + if b.BatchID != uuid.Nil { + err = b.MinibatchStore.UpdateBatchStatus(ctx, b.BatchID, BatchStatusFormed) + if err != nil { + _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error updating batch status")) + return fmt.Errorf("error updating batch status: %w", err) + } + } + + // Create new batch b.BatchID, err = uuid.NewV7() if err != nil { _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error generating batch UUID")) @@ -212,22 +225,24 @@ func (b *Minibatcher) DisperseBatch(ctx context.Context, state *core.IndexedOper for id, op := range state.IndexedOperators { opInfo := op opID := id + req := &DispersalRequest{ + BatchID: batchID, + MinibatchIndex: minibatchIndex, + OperatorID: opID, + Socket: op.Socket, + NumBlobs: uint(len(blobs)), + RequestedAt: time.Now().UTC(), + } + err := b.MinibatchStore.PutDispersalRequest(ctx, req) + if err != nil { + b.logger.Error("failed to put dispersal request", "err", err) + continue + } b.Pool.Submit(func() { - req := &DispersalRequest{ - BatchID: batchID, - MinibatchIndex: minibatchIndex, - OperatorID: opID, - Socket: op.Socket, - NumBlobs: uint(len(blobs)), - RequestedAt: time.Now().UTC(), - } - err := b.MinibatchStore.PutDispersalRequest(ctx, req) + signatures, err := b.SendBlobsToOperatorWithRetries(ctx, blobs, batchHeader, opInfo, opID, int(b.MaxNumRetriesPerDispersal)) if err != nil { - b.logger.Error("failed to put dispersal request", "err", err) - return + b.logger.Errorf("failed to send blobs to operator %s: %v", opID.Hex(), err) } - signatures, err := b.SendBlobsToOperatorWithRetries(ctx, blobs, batchHeader, opInfo, opID, int(b.MaxNumRetriesPerDispersal)) - // Update the minibatch state err = b.MinibatchStore.PutDispersalResponse(ctx, &DispersalResponse{ DispersalRequest: *req, diff --git a/disperser/batcher/minibatcher_test.go b/disperser/batcher/minibatcher_test.go index 0059d3815a..400b5120ca 100644 --- a/disperser/batcher/minibatcher_test.go +++ b/disperser/batcher/minibatcher_test.go @@ -171,7 +171,7 @@ func TestDisperseMinibatch(t *testing.T) { c.pool.StopWait() c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2) - dispersalRequests, err := c.minibatchStore.GetDispersalRequests(ctx, c.minibatcher.BatchID, 0) + dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, c.minibatcher.BatchID, 0) assert.NoError(t, err) assert.Len(t, dispersalRequests, 2) opIDs := make([]core.OperatorID, 2) @@ -185,7 +185,7 @@ func TestDisperseMinibatch(t *testing.T) { } assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1}) - dispersalResponses, err := c.minibatchStore.GetDispersalResponses(ctx, c.minibatcher.BatchID, 0) + dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, c.minibatcher.BatchID, 0) assert.NoError(t, err) assert.Len(t, dispersalResponses, 2) for _, resp := range dispersalResponses { @@ -262,7 +262,7 @@ func TestDisperseMinibatchFailure(t *testing.T) { c.pool.StopWait() c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2) - dispersalRequests, err := c.minibatchStore.GetDispersalRequests(ctx, c.minibatcher.BatchID, 0) + dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, c.minibatcher.BatchID, 0) assert.NoError(t, err) assert.Len(t, dispersalRequests, 2) opIDs := make([]core.OperatorID, 2) @@ -276,7 +276,7 @@ func TestDisperseMinibatchFailure(t *testing.T) { } assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1}) - dispersalResponses, err := c.minibatchStore.GetDispersalResponses(ctx, c.minibatcher.BatchID, 0) + dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, c.minibatcher.BatchID, 0) assert.NoError(t, err) assert.Len(t, dispersalResponses, 2) for _, resp := range dispersalResponses {