From 1e181098a7963d891809d42dbfc0f223b73c0597 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Tue, 20 Aug 2024 13:36:45 -0700 Subject: [PATCH] Move finalizer, encodingstreamer, chain state, assignment mgr into orchestrator --- disperser/batcher/batch_confirmer_test.go | 3 +- disperser/batcher/minibatcher.go | 100 +++------------------- disperser/batcher/minibatcher_test.go | 29 +++---- disperser/batcher/orchestrator.go | 59 ++++++++++++- 4 files changed, 81 insertions(+), 110 deletions(-) diff --git a/disperser/batcher/batch_confirmer_test.go b/disperser/batcher/batch_confirmer_test.go index 866fae98ed..cff1e178a6 100644 --- a/disperser/batcher/batch_confirmer_test.go +++ b/disperser/batcher/batch_confirmer_test.go @@ -57,7 +57,6 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents { dispatcher := dmock.NewDispatcher(state) blobStore := inmem.NewBlobStore() ethClient := &cmock.MockEthClient{} - finalizer := batmock.NewFinalizer() txnManager := batmock.NewTxnManager() minibatchStore := batcherinmem.NewMinibatchStore(logger) encodingWorkerPool := workerpool.New(10) @@ -77,7 +76,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents { MaxNumConnections: 10, MaxNumRetriesPerBlob: 2, MaxNumRetriesPerDispersal: 1, - }, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, ethClient, finalizer, pool, logger, metrics, handleMinibatchLivenessChan) + }, blobStore, minibatchStore, dispatcher, encodingStreamer, pool, logger, metrics, handleMinibatchLivenessChan) assert.NoError(t, err) config := bat.BatchConfirmerConfig{ diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go index f30908ee7c..7f7c9fd36a 100644 --- a/disperser/batcher/minibatcher.go +++ b/disperser/batcher/minibatcher.go @@ -2,7 +2,6 @@ package batcher import ( "context" - "errors" "fmt" "time" @@ -32,13 +31,11 @@ type BatchState struct { type Minibatcher struct { MinibatcherConfig - BlobStore disperser.BlobStore - MinibatchStore MinibatchStore - Dispatcher disperser.Dispatcher - ChainState core.IndexedChainState - AssignmentCoordinator core.AssignmentCoordinator - EncodingStreamer *EncodingStreamer - Pool common.WorkerPool + BlobStore disperser.BlobStore + MinibatchStore MinibatchStore + Dispatcher disperser.Dispatcher + EncodingStreamer *EncodingStreamer + Pool common.WorkerPool // local state Batches map[uuid.UUID]*BatchState @@ -47,10 +44,8 @@ type Minibatcher struct { MinibatchIndex uint HeartbeatChan chan time.Time - Metrics *Metrics - ethClient common.EthClient - finalizer Finalizer - logger logging.Logger + Metrics *Metrics + logger logging.Logger } func NewMinibatcher( @@ -58,100 +53,31 @@ func NewMinibatcher( blobStore disperser.BlobStore, minibatchStore MinibatchStore, dispatcher disperser.Dispatcher, - chainState core.IndexedChainState, - assignmentCoordinator core.AssignmentCoordinator, encodingStreamer *EncodingStreamer, - ethClient common.EthClient, - finalizer Finalizer, workerpool common.WorkerPool, logger logging.Logger, metrics *Metrics, heartbeatChan chan time.Time, ) (*Minibatcher, error) { return &Minibatcher{ - MinibatcherConfig: config, - BlobStore: blobStore, - MinibatchStore: minibatchStore, - Dispatcher: dispatcher, - ChainState: chainState, - AssignmentCoordinator: assignmentCoordinator, - EncodingStreamer: encodingStreamer, - Pool: workerpool, + MinibatcherConfig: config, + BlobStore: blobStore, + MinibatchStore: minibatchStore, + Dispatcher: dispatcher, + EncodingStreamer: encodingStreamer, + Pool: workerpool, Batches: make(map[uuid.UUID]*BatchState), ReferenceBlockNumber: 0, CurrentBatchID: uuid.Nil, MinibatchIndex: 0, - ethClient: ethClient, - finalizer: finalizer, logger: logger.With("component", "Minibatcher"), HeartbeatChan: heartbeatChan, Metrics: metrics, }, nil } -func (b *Minibatcher) Start(ctx context.Context) error { - err := b.ChainState.Start(ctx) - if err != nil { - return err - } - - // Wait for few seconds for indexer to index blockchain - // This won't be needed when we switch to using Graph node - time.Sleep(indexerWarmupDelay) - err = b.EncodingStreamer.Start(ctx) - if err != nil { - return err - } - batchTrigger := b.EncodingStreamer.EncodedSizeNotifier - - b.finalizer.Start(ctx) - - go func() { - ticker := time.NewTicker(b.PullInterval) - defer ticker.Stop() - cancelFuncs := make([]context.CancelFunc, 0) - for { - select { - case <-ctx.Done(): - for _, cancel := range cancelFuncs { - cancel() - } - return - case <-ticker.C: - cancel, err := b.HandleSingleMinibatch(ctx) - if err != nil { - if errors.Is(err, errNoEncodedResults) { - b.logger.Warn("no encoded results to construct minibatch") - } else { - b.logger.Error("failed to process minibatch", "err", err) - } - } - if cancel != nil { - cancelFuncs = append(cancelFuncs, cancel) - } - case <-batchTrigger.Notify: - ticker.Stop() - - cancel, err := b.HandleSingleMinibatch(ctx) - if err != nil { - if errors.Is(err, errNoEncodedResults) { - b.logger.Warn("no encoded results to construct minibatch") - } else { - b.logger.Error("failed to process minibatch", "err", err) - } - } - if cancel != nil { - cancelFuncs = append(cancelFuncs, cancel) - } - ticker.Reset(b.PullInterval) - } - } - }() - return nil -} - func (b *Minibatcher) RecoverState(ctx context.Context) error { //TODO: Implement minibatch recovery return fmt.Errorf("minibatch state recovery not implemented") diff --git a/disperser/batcher/minibatcher_test.go b/disperser/batcher/minibatcher_test.go index 62faf2d89f..886e7fb587 100644 --- a/disperser/batcher/minibatcher_test.go +++ b/disperser/batcher/minibatcher_test.go @@ -14,7 +14,6 @@ import ( "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/disperser/batcher" "github.com/Layr-Labs/eigenda/disperser/batcher/inmem" - batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock" dinmem "github.com/Layr-Labs/eigenda/disperser/common/inmem" dmock "github.com/Layr-Labs/eigenda/disperser/mock" "github.com/Layr-Labs/eigensdk-go/logging" @@ -84,7 +83,6 @@ func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcher p, err := makeTestProver() assert.NoError(t, err) encoderClient := disperser.NewLocalEncoderClient(p) - finalizer := batchermock.NewFinalizer() asgn := &core.StdAssignmentCoordinator{} chainState.On("GetCurrentBlockNumber").Return(initialBlock, nil) metrics := batcher.NewMetrics("9100", logger) @@ -94,24 +92,18 @@ func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcher ) encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, chainState, encoderClient, asgn, trigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger) assert.NoError(t, err) - ethClient := &cmock.MockEthClient{} pool := workerpool.New(int(config.MaxNumConnections)) - m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, chainState, asgn, encodingStreamer, ethClient, finalizer, pool, logger, metrics, handleMinibatchLivenessChan) - assert.NoError(t, err) - ics, err := chainState.GetIndexedOperatorState(context.Background(), 0, []core.QuorumID{0, 1}) + m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, encodingStreamer, pool, logger, metrics, handleMinibatchLivenessChan) assert.NoError(t, err) return &minibatcherComponents{ - minibatcher: m, - blobStore: blobStore, - minibatchStore: minibatchStore, - dispatcher: dispatcher, - chainState: ics, - assignmentCoordinator: asgn, - encodingStreamer: encodingStreamer, - pool: pool, - ethClient: ethClient, - logger: logger, - metrics: metrics, + minibatcher: m, + blobStore: blobStore, + minibatchStore: minibatchStore, + dispatcher: dispatcher, + encodingStreamer: encodingStreamer, + pool: pool, + logger: logger, + metrics: metrics, } } @@ -531,8 +523,7 @@ func TestMinibatcherTooManyPendingRequests(t *testing.T) { ctx := context.Background() mockWorkerPool := &cmock.MockWorkerpool{} // minibatcher with mock worker pool - finalizer := batchermock.NewFinalizer() - m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.minibatcher.ChainState, c.assignmentCoordinator, c.encodingStreamer, c.ethClient, finalizer, mockWorkerPool, c.logger, c.metrics, handleMinibatchLivenessChan) + m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.encodingStreamer, mockWorkerPool, c.logger, c.metrics, handleMinibatchLivenessChan) assert.NoError(t, err) mockWorkerPool.On("WaitingQueueSize").Return(int(defaultConfig.MaxNumConnections + 1)).Once() _, err = m.HandleSingleMinibatch(ctx) diff --git a/disperser/batcher/orchestrator.go b/disperser/batcher/orchestrator.go index b4f50c6ef2..2f31e7fd14 100644 --- a/disperser/batcher/orchestrator.go +++ b/disperser/batcher/orchestrator.go @@ -2,6 +2,7 @@ package batcher import ( "context" + "errors" "time" "github.com/Layr-Labs/eigenda/common" @@ -74,7 +75,7 @@ func NewOrchestrator( } var miniBatcher *Minibatcher - miniBatcher, err = NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, chainState, assignmentCoordinator, encodingStreamer, ethClient, finalizer, encodingWorkerPool, logger, metrics, heartbeatChan) + miniBatcher, err = NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, encodingStreamer, encodingWorkerPool, logger, metrics, heartbeatChan) if err != nil { return nil, err } @@ -105,9 +106,63 @@ func NewOrchestrator( } func (o *Orchestrator) Start(ctx context.Context) error { - err := o.MiniBatcher.Start(ctx) + err := o.ChainState.Start(ctx) if err != nil { return err } + + // Wait for few seconds for indexer to index blockchain + // This won't be needed when we switch to using Graph node + time.Sleep(indexerWarmupDelay) + + err = o.EncodingStreamer.Start(ctx) + if err != nil { + return err + } + batchTrigger := o.EncodingStreamer.EncodedSizeNotifier + + o.finalizer.Start(ctx) + + go func() { + ticker := time.NewTicker(o.PullInterval) + defer ticker.Stop() + cancelFuncs := make([]context.CancelFunc, 0) + for { + select { + case <-ctx.Done(): + for _, cancel := range cancelFuncs { + cancel() + } + return + case <-ticker.C: + cancel, err := o.MiniBatcher.HandleSingleMinibatch(ctx) + if err != nil { + if errors.Is(err, errNoEncodedResults) { + o.logger.Warn("no encoded results to construct minibatch") + } else { + o.logger.Error("failed to process minibatch", "err", err) + } + } + if cancel != nil { + cancelFuncs = append(cancelFuncs, cancel) + } + case <-batchTrigger.Notify: + ticker.Stop() + + cancel, err := o.MiniBatcher.HandleSingleMinibatch(ctx) + if err != nil { + if errors.Is(err, errNoEncodedResults) { + o.logger.Warn("no encoded results to construct minibatch") + } else { + o.logger.Error("failed to process minibatch", "err", err) + } + } + if cancel != nil { + cancelFuncs = append(cancelFuncs, cancel) + } + ticker.Reset(o.PullInterval) + } + } + }() return nil }