Skip to content

Commit

Permalink
Move finalizer, encodingstreamer, chain state, assignment mgr into or…
Browse files Browse the repository at this point in the history
…chestrator
  • Loading branch information
pschork committed Aug 20, 2024
1 parent 980ac9d commit e4700f3
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 122 deletions.
3 changes: 1 addition & 2 deletions disperser/batcher/batch_confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents {
dispatcher := dmock.NewDispatcher(state)
blobStore := inmem.NewBlobStore()
ethClient := &cmock.MockEthClient{}
finalizer := batmock.NewFinalizer()
txnManager := batmock.NewTxnManager()
minibatchStore := batcherinmem.NewMinibatchStore(logger)
encodingWorkerPool := workerpool.New(10)
Expand All @@ -77,7 +76,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents {
MaxNumConnections: 10,
MaxNumRetriesPerBlob: 2,
MaxNumRetriesPerDispersal: 1,
}, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, ethClient, finalizer, pool, logger, metrics, handleMinibatchLivenessChan)
}, blobStore, minibatchStore, dispatcher, encodingStreamer, pool, logger, metrics, handleMinibatchLivenessChan)
assert.NoError(t, err)

config := bat.BatchConfirmerConfig{
Expand Down
100 changes: 13 additions & 87 deletions disperser/batcher/minibatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package batcher

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -32,13 +31,11 @@ type BatchState struct {
type Minibatcher struct {
MinibatcherConfig

BlobStore disperser.BlobStore
MinibatchStore MinibatchStore
Dispatcher disperser.Dispatcher
ChainState core.IndexedChainState
AssignmentCoordinator core.AssignmentCoordinator
EncodingStreamer *EncodingStreamer
Pool common.WorkerPool
BlobStore disperser.BlobStore
MinibatchStore MinibatchStore
Dispatcher disperser.Dispatcher
EncodingStreamer *EncodingStreamer
Pool common.WorkerPool

// local state
Batches map[uuid.UUID]*BatchState
Expand All @@ -47,111 +44,40 @@ type Minibatcher struct {
MinibatchIndex uint
HeartbeatChan chan time.Time

Metrics *Metrics
ethClient common.EthClient
finalizer Finalizer
logger logging.Logger
Metrics *Metrics
logger logging.Logger
}

func NewMinibatcher(
config MinibatcherConfig,
blobStore disperser.BlobStore,
minibatchStore MinibatchStore,
dispatcher disperser.Dispatcher,
chainState core.IndexedChainState,
assignmentCoordinator core.AssignmentCoordinator,
encodingStreamer *EncodingStreamer,
ethClient common.EthClient,
finalizer Finalizer,
workerpool common.WorkerPool,
logger logging.Logger,
metrics *Metrics,
heartbeatChan chan time.Time,
) (*Minibatcher, error) {
return &Minibatcher{
MinibatcherConfig: config,
BlobStore: blobStore,
MinibatchStore: minibatchStore,
Dispatcher: dispatcher,
ChainState: chainState,
AssignmentCoordinator: assignmentCoordinator,
EncodingStreamer: encodingStreamer,
Pool: workerpool,
MinibatcherConfig: config,
BlobStore: blobStore,
MinibatchStore: minibatchStore,
Dispatcher: dispatcher,
EncodingStreamer: encodingStreamer,
Pool: workerpool,

Batches: make(map[uuid.UUID]*BatchState),
ReferenceBlockNumber: 0,
CurrentBatchID: uuid.Nil,
MinibatchIndex: 0,

ethClient: ethClient,
finalizer: finalizer,
logger: logger.With("component", "Minibatcher"),
HeartbeatChan: heartbeatChan,
Metrics: metrics,
}, nil
}

func (b *Minibatcher) Start(ctx context.Context) error {
err := b.ChainState.Start(ctx)
if err != nil {
return err
}

// Wait for few seconds for indexer to index blockchain
// This won't be needed when we switch to using Graph node
time.Sleep(indexerWarmupDelay)
err = b.EncodingStreamer.Start(ctx)
if err != nil {
return err
}
batchTrigger := b.EncodingStreamer.EncodedSizeNotifier

b.finalizer.Start(ctx)

go func() {
ticker := time.NewTicker(b.PullInterval)
defer ticker.Stop()
cancelFuncs := make([]context.CancelFunc, 0)
for {
select {
case <-ctx.Done():
for _, cancel := range cancelFuncs {
cancel()
}
return
case <-ticker.C:
cancel, err := b.HandleSingleMinibatch(ctx)
if err != nil {
if errors.Is(err, errNoEncodedResults) {
b.logger.Warn("no encoded results to construct minibatch")
} else {
b.logger.Error("failed to process minibatch", "err", err)
}
}
if cancel != nil {
cancelFuncs = append(cancelFuncs, cancel)
}
case <-batchTrigger.Notify:
ticker.Stop()

cancel, err := b.HandleSingleMinibatch(ctx)
if err != nil {
if errors.Is(err, errNoEncodedResults) {
b.logger.Warn("no encoded results to construct minibatch")
} else {
b.logger.Error("failed to process minibatch", "err", err)
}
}
if cancel != nil {
cancelFuncs = append(cancelFuncs, cancel)
}
ticker.Reset(b.PullInterval)
}
}
}()
return nil
}

func (b *Minibatcher) RecoverState(ctx context.Context) error {
//TODO: Implement minibatch recovery
return fmt.Errorf("minibatch state recovery not implemented")
Expand Down
50 changes: 19 additions & 31 deletions disperser/batcher/minibatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ import (
"time"

cmock "github.com/Layr-Labs/eigenda/common/mock"
commonmock "github.com/Layr-Labs/eigenda/common/mock"
"github.com/Layr-Labs/eigenda/core"
coremock "github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/batcher"
"github.com/Layr-Labs/eigenda/disperser/batcher/inmem"
batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock"
dinmem "github.com/Layr-Labs/eigenda/disperser/common/inmem"
dmock "github.com/Layr-Labs/eigenda/disperser/mock"
"github.com/Layr-Labs/eigensdk-go/logging"
Expand Down Expand Up @@ -50,17 +48,15 @@ const (
)

type minibatcherComponents struct {
minibatcher *batcher.Minibatcher
blobStore disperser.BlobStore
minibatchStore batcher.MinibatchStore
dispatcher *dmock.Dispatcher
chainState *core.IndexedOperatorState
assignmentCoordinator core.AssignmentCoordinator
encodingStreamer *batcher.EncodingStreamer
pool *workerpool.WorkerPool
ethClient *commonmock.MockEthClient
logger logging.Logger
metrics *batcher.Metrics
minibatcher *batcher.Minibatcher
blobStore disperser.BlobStore
minibatchStore batcher.MinibatchStore
dispatcher *dmock.Dispatcher
chainState *core.IndexedOperatorState
encodingStreamer *batcher.EncodingStreamer
pool *workerpool.WorkerPool
logger logging.Logger
metrics *batcher.Metrics
}

func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcherComponents {
Expand All @@ -84,7 +80,6 @@ func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcher
p, err := makeTestProver()
assert.NoError(t, err)
encoderClient := disperser.NewLocalEncoderClient(p)
finalizer := batchermock.NewFinalizer()
asgn := &core.StdAssignmentCoordinator{}
chainState.On("GetCurrentBlockNumber").Return(initialBlock, nil)
metrics := batcher.NewMetrics("9100", logger)
Expand All @@ -94,24 +89,18 @@ func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcher
)
encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, chainState, encoderClient, asgn, trigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger)
assert.NoError(t, err)
ethClient := &cmock.MockEthClient{}
pool := workerpool.New(int(config.MaxNumConnections))
m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, chainState, asgn, encodingStreamer, ethClient, finalizer, pool, logger, metrics, handleMinibatchLivenessChan)
assert.NoError(t, err)
ics, err := chainState.GetIndexedOperatorState(context.Background(), 0, []core.QuorumID{0, 1})
m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, encodingStreamer, pool, logger, metrics, handleMinibatchLivenessChan)
assert.NoError(t, err)
return &minibatcherComponents{
minibatcher: m,
blobStore: blobStore,
minibatchStore: minibatchStore,
dispatcher: dispatcher,
chainState: ics,
assignmentCoordinator: asgn,
encodingStreamer: encodingStreamer,
pool: pool,
ethClient: ethClient,
logger: logger,
metrics: metrics,
minibatcher: m,
blobStore: blobStore,
minibatchStore: minibatchStore,
dispatcher: dispatcher,
encodingStreamer: encodingStreamer,
pool: pool,
logger: logger,
metrics: metrics,
}
}

Expand Down Expand Up @@ -531,8 +520,7 @@ func TestMinibatcherTooManyPendingRequests(t *testing.T) {
ctx := context.Background()
mockWorkerPool := &cmock.MockWorkerpool{}
// minibatcher with mock worker pool
finalizer := batchermock.NewFinalizer()
m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.minibatcher.ChainState, c.assignmentCoordinator, c.encodingStreamer, c.ethClient, finalizer, mockWorkerPool, c.logger, c.metrics, handleMinibatchLivenessChan)
m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.encodingStreamer, mockWorkerPool, c.logger, c.metrics, handleMinibatchLivenessChan)
assert.NoError(t, err)
mockWorkerPool.On("WaitingQueueSize").Return(int(defaultConfig.MaxNumConnections + 1)).Once()
_, err = m.HandleSingleMinibatch(ctx)
Expand Down
59 changes: 57 additions & 2 deletions disperser/batcher/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package batcher

import (
"context"
"errors"
"time"

"github.com/Layr-Labs/eigenda/common"
Expand Down Expand Up @@ -74,7 +75,7 @@ func NewOrchestrator(
}

var miniBatcher *Minibatcher
miniBatcher, err = NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, chainState, assignmentCoordinator, encodingStreamer, ethClient, finalizer, encodingWorkerPool, logger, metrics, heartbeatChan)
miniBatcher, err = NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, encodingStreamer, encodingWorkerPool, logger, metrics, heartbeatChan)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -105,9 +106,63 @@ func NewOrchestrator(
}

func (o *Orchestrator) Start(ctx context.Context) error {
err := o.MiniBatcher.Start(ctx)
err := o.ChainState.Start(ctx)
if err != nil {
return err
}

// Wait for few seconds for indexer to index blockchain
// This won't be needed when we switch to using Graph node
time.Sleep(indexerWarmupDelay)

err = o.EncodingStreamer.Start(ctx)
if err != nil {
return err
}
batchTrigger := o.EncodingStreamer.EncodedSizeNotifier

o.finalizer.Start(ctx)

go func() {
ticker := time.NewTicker(o.PullInterval)
defer ticker.Stop()
cancelFuncs := make([]context.CancelFunc, 0)
for {
select {
case <-ctx.Done():
for _, cancel := range cancelFuncs {
cancel()
}
return
case <-ticker.C:
cancel, err := o.MiniBatcher.HandleSingleMinibatch(ctx)
if err != nil {
if errors.Is(err, errNoEncodedResults) {
o.logger.Warn("no encoded results to construct minibatch")
} else {
o.logger.Error("failed to process minibatch", "err", err)
}
}
if cancel != nil {
cancelFuncs = append(cancelFuncs, cancel)
}
case <-batchTrigger.Notify:
ticker.Stop()

cancel, err := o.MiniBatcher.HandleSingleMinibatch(ctx)
if err != nil {
if errors.Is(err, errNoEncodedResults) {
o.logger.Warn("no encoded results to construct minibatch")
} else {
o.logger.Error("failed to process minibatch", "err", err)
}
}
if cancel != nil {
cancelFuncs = append(cancelFuncs, cancel)
}
ticker.Reset(o.PullInterval)
}
}
}()
return nil
}

0 comments on commit e4700f3

Please sign in to comment.