Skip to content

Commit

Permalink
Enable minibatcher
Browse files Browse the repository at this point in the history
Refactor batcher/minibatcher creation to use generic interface

Fix integration test

Fix tests

Update batcher interface type cast
Move minibatcher tablename check to earlier init phase

Make minibatcher table name optional

Add info output at startup

Lint

Remove log output

Rename MinibatchStore to Batchstore

Remove generic batch/minibatcher interface

Add orchestrator abstraction

Add metrics

Add finalizer

Refactor and consolidate encoding client/streamer init

Revert batcher to original
Move minibatcher branch logic to main

Move finalizer, encodingstreamer, chain state, assignment mgr into orchestrator

Add batchConfirmer to orchestrator

Move signalLiveness into orchestrator

Add transactionMgr startup to orchestrator

Lint

Lint

Lint

Lint
  • Loading branch information
pschork committed Aug 28, 2024
1 parent 206e951 commit 0b5e119
Show file tree
Hide file tree
Showing 12 changed files with 358 additions and 156 deletions.
10 changes: 9 additions & 1 deletion disperser/batcher/batch_confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type BatchConfirmerConfig struct {
DispersalStatusCheckInterval time.Duration
AttestationTimeout time.Duration
SRSOrder int
NumConnections int
NumConnections uint
MaxNumRetriesPerBlob uint
}

Expand All @@ -46,6 +46,7 @@ type BatchConfirmer struct {

ethClient common.EthClient
logger logging.Logger
Metrics *Metrics
}

func NewBatchConfirmer(
Expand All @@ -62,6 +63,7 @@ func NewBatchConfirmer(
txnManager TxnManager,
minibatcher *Minibatcher,
logger logging.Logger,
metrics *Metrics,
) (*BatchConfirmer, error) {
return &BatchConfirmer{
BatchConfirmerConfig: config,
Expand All @@ -76,6 +78,7 @@ func NewBatchConfirmer(
Transactor: transactor,
TransactionManager: txnManager,
Minibatcher: minibatcher,
Metrics: metrics,

ethClient: ethClient,
logger: logger.With("component", "BatchConfirmer"),
Expand Down Expand Up @@ -285,6 +288,11 @@ func (b *BatchConfirmer) handleFailure(ctx context.Context, batchID uuid.UUID, b
continue
}

if reason == FailNoSignatures {
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.InsufficientSignatures)
} else {
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.Failed)
}
numPermanentFailures++
}

Expand Down
7 changes: 3 additions & 4 deletions disperser/batcher/batch_confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
bat "github.com/Layr-Labs/eigenda/disperser/batcher"
batcherinmem "github.com/Layr-Labs/eigenda/disperser/batcher/inmem"
batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock"
batmock "github.com/Layr-Labs/eigenda/disperser/batcher/mock"
"github.com/Layr-Labs/eigenda/disperser/common/inmem"
dmock "github.com/Layr-Labs/eigenda/disperser/mock"
"github.com/Layr-Labs/eigenda/encoding"
Expand Down Expand Up @@ -57,7 +56,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents {
dispatcher := dmock.NewDispatcher(state)
blobStore := inmem.NewBlobStore()
ethClient := &cmock.MockEthClient{}
txnManager := batmock.NewTxnManager()
txnManager := batchermock.NewTxnManager()
minibatchStore := batcherinmem.NewMinibatchStore(logger)
encodingWorkerPool := workerpool.New(10)
encoderProver, err = makeTestProver()
Expand All @@ -76,7 +75,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents {
MaxNumConnections: 10,
MaxNumRetriesPerBlob: 2,
MaxNumRetriesPerDispersal: 1,
}, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, ethClient, pool, logger)
}, blobStore, minibatchStore, dispatcher, encodingStreamer, pool, logger, metrics)
assert.NoError(t, err)

config := bat.BatchConfirmerConfig{
Expand All @@ -88,7 +87,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents {
SRSOrder: 3000,
MaxNumRetriesPerBlob: 2,
}
b, err := bat.NewBatchConfirmer(config, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, agg, ethClient, transactor, txnManager, minibatcher, logger)
b, err := bat.NewBatchConfirmer(config, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, agg, ethClient, transactor, txnManager, minibatcher, logger, metrics)
assert.NoError(t, err)

ethClient.On("BlockNumber").Return(uint64(initialBlock), nil)
Expand Down
10 changes: 10 additions & 0 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type QuorumInfo struct {

type TimeoutConfig struct {
EncodingTimeout time.Duration
DispersalTimeout time.Duration
AttestationTimeout time.Duration
ChainReadTimeout time.Duration
ChainWriteTimeout time.Duration
Expand All @@ -64,6 +65,11 @@ type Config struct {

TargetNumChunks uint
MaxBlobsToFetchFromStore int

EnableMinibatch bool
BatchstoreTableName string
MinibatcherConfig MinibatcherConfig
DispersalStatusCheckInterval time.Duration
}

type Batcher struct {
Expand Down Expand Up @@ -105,6 +111,10 @@ func NewBatcher(
metrics *Metrics,
heartbeatChan chan time.Time,
) (*Batcher, error) {
if config.EnableMinibatch {
return nil, errors.New("minibatch is not supported")
}

batchTrigger := NewEncodedSizeNotifier(
make(chan struct{}, 1),
uint64(config.BatchSizeMBLimit)*1024*1024, // convert to bytes
Expand Down
2 changes: 0 additions & 2 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type batcherComponents struct {
transactor *coremock.MockTransactor
txnManager *batchermock.MockTxnManager
blobStore disperser.BlobStore
encoderClient *disperser.LocalEncoderClient
encodingStreamer *bat.EncodingStreamer
ethClient *cmock.MockEthClient
dispatcher *dmock.Dispatcher
Expand Down Expand Up @@ -153,7 +152,6 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.
transactor: transactor,
txnManager: txnManager,
blobStore: blobStore,
encoderClient: encoderClient,
encodingStreamer: b.EncodingStreamer,
ethClient: ethClient,
dispatcher: dispatcher,
Expand Down
2 changes: 1 addition & 1 deletion disperser/batcher/batchstore/minibatch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type MinibatchStore struct {

var _ batcher.MinibatchStore = (*MinibatchStore)(nil)

func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *MinibatchStore {
func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) batcher.MinibatchStore {
logger.Debugf("creating minibatch store with table %s with TTL: %s", tableName, ttl)
return &MinibatchStore{
dynamoDBClient: dynamoDBClient,
Expand Down
2 changes: 1 addition & 1 deletion disperser/batcher/batchstore/minibatch_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var (
localStackPort = "4566"

dynamoClient *dynamodb.Client
minibatchStore *batchstore.MinibatchStore
minibatchStore batcher.MinibatchStore

UUID = uuid.New()
minibatchTableName = fmt.Sprintf("test-MinibatchStore-%v", UUID)
Expand Down
96 changes: 24 additions & 72 deletions disperser/batcher/minibatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package batcher

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -32,92 +31,53 @@ type BatchState struct {
type Minibatcher struct {
MinibatcherConfig

BlobStore disperser.BlobStore
MinibatchStore MinibatchStore
Dispatcher disperser.Dispatcher
ChainState core.IndexedChainState
AssignmentCoordinator core.AssignmentCoordinator
EncodingStreamer *EncodingStreamer
Pool common.WorkerPool
BlobStore disperser.BlobStore
MinibatchStore MinibatchStore
Dispatcher disperser.Dispatcher
EncodingStreamer *EncodingStreamer
Pool common.WorkerPool

// local state
Batches map[uuid.UUID]*BatchState
ReferenceBlockNumber uint
CurrentBatchID uuid.UUID
MinibatchIndex uint

ethClient common.EthClient
logger logging.Logger
Metrics *Metrics
logger logging.Logger
}

func NewMinibatcher(
config MinibatcherConfig,
blobStore disperser.BlobStore,
minibatchStore MinibatchStore,
dispatcher disperser.Dispatcher,
chainState core.IndexedChainState,
assignmentCoordinator core.AssignmentCoordinator,
encodingStreamer *EncodingStreamer,
ethClient common.EthClient,
workerpool common.WorkerPool,
logger logging.Logger,
metrics *Metrics,
) (*Minibatcher, error) {
return &Minibatcher{
MinibatcherConfig: config,
BlobStore: blobStore,
MinibatchStore: minibatchStore,
Dispatcher: dispatcher,
ChainState: chainState,
AssignmentCoordinator: assignmentCoordinator,
EncodingStreamer: encodingStreamer,
Pool: workerpool,
MinibatcherConfig: config,
BlobStore: blobStore,
MinibatchStore: minibatchStore,
Dispatcher: dispatcher,
EncodingStreamer: encodingStreamer,
Pool: workerpool,

Batches: make(map[uuid.UUID]*BatchState),
ReferenceBlockNumber: 0,
CurrentBatchID: uuid.Nil,
MinibatchIndex: 0,

ethClient: ethClient,
logger: logger.With("component", "Minibatcher"),
logger: logger.With("component", "Minibatcher"),
Metrics: metrics,
}, nil
}

func (b *Minibatcher) Start(ctx context.Context) error {
err := b.ChainState.Start(ctx)
if err != nil {
return err
}
// Wait for few seconds for indexer to index blockchain
// This won't be needed when we switch to using Graph node
time.Sleep(indexerWarmupDelay)
go func() {
ticker := time.NewTicker(b.PullInterval)
defer ticker.Stop()
cancelFuncs := make([]context.CancelFunc, 0)
for {
select {
case <-ctx.Done():
for _, cancel := range cancelFuncs {
cancel()
}
return
case <-ticker.C:
cancel, err := b.HandleSingleMinibatch(ctx)
if err != nil {
if errors.Is(err, errNoEncodedResults) {
b.logger.Warn("no encoded results to make a batch with")
} else {
b.logger.Error("failed to process a batch", "err", err)
}
}
if cancel != nil {
cancelFuncs = append(cancelFuncs, cancel)
}
}
}
}()

return nil
func (b *Minibatcher) RecoverState(ctx context.Context) error {
//TODO: Implement minibatch recovery
return fmt.Errorf("minibatch state recovery not implemented")
}

func (b *Minibatcher) PopBatchState(batchID uuid.UUID) *BatchState {
Expand All @@ -136,7 +96,7 @@ func (b *Minibatcher) handleFailure(ctx context.Context, blobMetadatas []*disper
b.EncodingStreamer.RemoveEncodedBlob(metadata)
retry, err := b.BlobStore.HandleBlobFailure(ctx, metadata, b.MaxNumRetriesPerBlob)
if err != nil {
b.logger.Error("HandleSingleBatch: error handling blob failure", "err", err)
b.logger.Error("error handling blobstore blob failure", "err", err)
// Append the error
result = multierror.Append(result, err)
}
Expand All @@ -153,6 +113,7 @@ func (b *Minibatcher) handleFailure(ctx context.Context, blobMetadatas []*disper

func (b *Minibatcher) HandleSingleMinibatch(ctx context.Context) (context.CancelFunc, error) {
log := b.logger

// If too many dispersal requests are pending, skip an iteration
if pending := b.Pool.WaitingQueueSize(); pending > int(b.MaxNumConnections) {
return nil, fmt.Errorf("too many pending requests %d with max number of connections %d. skipping minibatch iteration", pending, b.MaxNumConnections)
Expand Down Expand Up @@ -218,16 +179,7 @@ func (b *Minibatcher) HandleSingleMinibatch(ctx context.Context) (context.Cancel
err := b.createBlobMinibatchMappings(ctx, b.CurrentBatchID, b.MinibatchIndex, minibatch.BlobMetadata, minibatch.BlobHeaders)
storeMappingsChan <- err
}()

// Disperse the minibatch to operators in all quorums
// If an operator doesn't have any bundles, it won't receive any chunks but it will still receive blob headers
operatorsAllQuorums, err := b.ChainState.GetIndexedOperators(ctx, b.ReferenceBlockNumber)
if err != nil {
cancelDispersal()
_ = b.handleFailure(ctx, minibatch.BlobMetadata, FailReason("error getting operator state for all quorums"))
return nil, fmt.Errorf("error getting operator state for all quorums: %w", err)
}
b.DisperseBatch(dispersalCtx, operatorsAllQuorums, minibatch.EncodedBlobs, minibatch.BatchHeader, b.CurrentBatchID, b.MinibatchIndex)
b.DisperseBatch(dispersalCtx, minibatch.State, minibatch.EncodedBlobs, minibatch.BatchHeader, b.CurrentBatchID, b.MinibatchIndex)
log.Debug("DisperseBatch took", "duration", time.Since(stageTimer).String())

h, err := minibatch.State.OperatorState.Hash()
Expand All @@ -252,8 +204,8 @@ func (b *Minibatcher) HandleSingleMinibatch(ctx context.Context) (context.Cancel
return cancelDispersal, nil
}

func (b *Minibatcher) DisperseBatch(ctx context.Context, operators map[core.OperatorID]*core.IndexedOperatorInfo, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, batchID uuid.UUID, minibatchIndex uint) {
for id, op := range operators {
func (b *Minibatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, batchID uuid.UUID, minibatchIndex uint) {
for id, op := range state.IndexedOperators {
opInfo := op
opID := id
req := &MinibatchDispersal{
Expand Down
Loading

0 comments on commit 0b5e119

Please sign in to comment.