From e7c9c4309a7898dbbff5a12a03b66f4d58580251 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Mon, 19 Aug 2024 17:52:13 -0700 Subject: [PATCH] Refactor and consolidate encoding client/streamer init --- disperser/batcher/batcher.go | 34 ++++++------------------------- disperser/batcher/batcher_test.go | 12 ++++++++--- disperser/batcher/orchestrator.go | 2 +- 3 files changed, 16 insertions(+), 32 deletions(-) diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 8d71234138..bb35b660b5 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -15,7 +15,6 @@ import ( "github.com/Layr-Labs/eigensdk-go/logging" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/core/types" - "github.com/gammazero/workerpool" "github.com/google/uuid" "github.com/hashicorp/go-multierror" "github.com/prometheus/client_golang/prometheus" @@ -26,14 +25,13 @@ type Batcher struct { Config TimeoutConfig - Queue disperser.BlobStore - Dispatcher disperser.Dispatcher - EncoderClient disperser.EncoderClient + Queue disperser.BlobStore + Dispatcher disperser.Dispatcher ChainState core.IndexedChainState AssignmentCoordinator core.AssignmentCoordinator - Aggregator core.SignatureAggregator EncodingStreamer *EncodingStreamer + Aggregator core.SignatureAggregator Transactor core.Transactor TransactionManager TxnManager Metrics *Metrics @@ -51,7 +49,7 @@ func NewBatcher( dispatcher disperser.Dispatcher, chainState core.IndexedChainState, assignmentCoordinator core.AssignmentCoordinator, - encoderClient disperser.EncoderClient, + encodingStreamer *EncodingStreamer, aggregator core.SignatureAggregator, ethClient common.EthClient, finalizer Finalizer, @@ -61,32 +59,12 @@ func NewBatcher( metrics *Metrics, heartbeatChan chan time.Time, ) (*Batcher, error) { - batchTrigger := NewEncodedSizeNotifier( - make(chan struct{}, 1), - uint64(config.BatchSizeMBLimit)*1024*1024, // convert to bytes - ) - streamerConfig := StreamerConfig{ - SRSOrder: config.SRSOrder, - EncodingRequestTimeout: config.PullInterval, - EncodingQueueLimit: config.EncodingRequestQueueSize, - TargetNumChunks: config.TargetNumChunks, - MaxBlobsToFetchFromStore: config.MaxBlobsToFetchFromStore, - FinalizationBlockDelay: config.FinalizationBlockDelay, - ChainStateTimeout: timeoutConfig.ChainStateTimeout, - } - encodingWorkerPool := workerpool.New(config.NumConnections) - encodingStreamer, err := NewEncodingStreamer(streamerConfig, queue, chainState, encoderClient, assignmentCoordinator, batchTrigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger) - if err != nil { - return nil, err - } - return &Batcher{ Config: config, TimeoutConfig: timeoutConfig, - Queue: queue, - Dispatcher: dispatcher, - EncoderClient: encoderClient, + Queue: queue, + Dispatcher: dispatcher, ChainState: chainState, AssignmentCoordinator: assignmentCoordinator, diff --git a/disperser/batcher/batcher_test.go b/disperser/batcher/batcher_test.go index 0a2e8d91a7..f57fea2a8b 100644 --- a/disperser/batcher/batcher_test.go +++ b/disperser/batcher/batcher_test.go @@ -12,11 +12,13 @@ import ( "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/gammazero/workerpool" cmock "github.com/Layr-Labs/eigenda/common/mock" "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/disperser/batcher" bat "github.com/Layr-Labs/eigenda/disperser/batcher" batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock" batmock "github.com/Layr-Labs/eigenda/disperser/batcher/mock" @@ -41,7 +43,6 @@ type batcherComponents struct { transactor *coremock.MockTransactor txnManager *batchermock.MockTxnManager blobStore disperser.BlobStore - encoderClient *disperser.LocalEncoderClient encodingStreamer *bat.EncodingStreamer ethClient *cmock.MockEthClient dispatcher *dmock.Dispatcher @@ -123,12 +124,18 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time. metrics := bat.NewMetrics("9100", logger) + trigger := batcher.NewEncodedSizeNotifier( + make(chan struct{}, 1), + 10*1024*1024, + ) + encodingWorkerPool := workerpool.New(10) encoderClient := disperser.NewLocalEncoderClient(p) + encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, trigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger) finalizer := batchermock.NewFinalizer() ethClient := &cmock.MockEthClient{} txnManager := batmock.NewTxnManager() - b, err := bat.NewBatcher(config, timeoutConfig, blobStore, dispatcher, cst, asgn, encoderClient, agg, ethClient, finalizer, transactor, txnManager, logger, metrics, handleBatchLivenessChan) + b, err := bat.NewBatcher(config, timeoutConfig, blobStore, dispatcher, cst, asgn, encodingStreamer, agg, ethClient, finalizer, transactor, txnManager, logger, metrics, handleBatchLivenessChan) assert.NoError(t, err) var mu sync.Mutex @@ -153,7 +160,6 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time. transactor: transactor, txnManager: txnManager, blobStore: blobStore, - encoderClient: encoderClient, encodingStreamer: b.EncodingStreamer, ethClient: ethClient, dispatcher: dispatcher, diff --git a/disperser/batcher/orchestrator.go b/disperser/batcher/orchestrator.go index 35b73c05b1..96bf52e1e3 100644 --- a/disperser/batcher/orchestrator.go +++ b/disperser/batcher/orchestrator.go @@ -128,7 +128,7 @@ func NewOrchestrator( return nil, err } } else { - batcher, err = NewBatcher(config, timeoutConfig, queue, dispatcher, chainState, assignmentCoordinator, encoderClient, aggregator, ethClient, finalizer, transactor, txnManager, logger, metrics, heartbeatChan) + batcher, err = NewBatcher(config, timeoutConfig, queue, dispatcher, chainState, assignmentCoordinator, encodingStreamer, aggregator, ethClient, finalizer, transactor, txnManager, logger, metrics, heartbeatChan) if err != nil { return nil, err }