Skip to content

Commit

Permalink
Refactor and consolidate encoding client/streamer init
Browse files Browse the repository at this point in the history
  • Loading branch information
pschork committed Aug 20, 2024
1 parent f8678a7 commit e7c9c43
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 32 deletions.
34 changes: 6 additions & 28 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/core/types"
"github.com/gammazero/workerpool"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -26,14 +25,13 @@ type Batcher struct {
Config
TimeoutConfig

Queue disperser.BlobStore
Dispatcher disperser.Dispatcher
EncoderClient disperser.EncoderClient
Queue disperser.BlobStore
Dispatcher disperser.Dispatcher

ChainState core.IndexedChainState
AssignmentCoordinator core.AssignmentCoordinator
Aggregator core.SignatureAggregator
EncodingStreamer *EncodingStreamer
Aggregator core.SignatureAggregator
Transactor core.Transactor
TransactionManager TxnManager
Metrics *Metrics
Expand All @@ -51,7 +49,7 @@ func NewBatcher(
dispatcher disperser.Dispatcher,
chainState core.IndexedChainState,
assignmentCoordinator core.AssignmentCoordinator,
encoderClient disperser.EncoderClient,
encodingStreamer *EncodingStreamer,
aggregator core.SignatureAggregator,
ethClient common.EthClient,
finalizer Finalizer,
Expand All @@ -61,32 +59,12 @@ func NewBatcher(
metrics *Metrics,
heartbeatChan chan time.Time,
) (*Batcher, error) {
batchTrigger := NewEncodedSizeNotifier(
make(chan struct{}, 1),
uint64(config.BatchSizeMBLimit)*1024*1024, // convert to bytes
)
streamerConfig := StreamerConfig{
SRSOrder: config.SRSOrder,
EncodingRequestTimeout: config.PullInterval,
EncodingQueueLimit: config.EncodingRequestQueueSize,
TargetNumChunks: config.TargetNumChunks,
MaxBlobsToFetchFromStore: config.MaxBlobsToFetchFromStore,
FinalizationBlockDelay: config.FinalizationBlockDelay,
ChainStateTimeout: timeoutConfig.ChainStateTimeout,
}
encodingWorkerPool := workerpool.New(config.NumConnections)
encodingStreamer, err := NewEncodingStreamer(streamerConfig, queue, chainState, encoderClient, assignmentCoordinator, batchTrigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger)
if err != nil {
return nil, err
}

return &Batcher{
Config: config,
TimeoutConfig: timeoutConfig,

Queue: queue,
Dispatcher: dispatcher,
EncoderClient: encoderClient,
Queue: queue,
Dispatcher: dispatcher,

ChainState: chainState,
AssignmentCoordinator: assignmentCoordinator,
Expand Down
12 changes: 9 additions & 3 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/gammazero/workerpool"

cmock "github.com/Layr-Labs/eigenda/common/mock"
"github.com/Layr-Labs/eigenda/core"
coremock "github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/batcher"
bat "github.com/Layr-Labs/eigenda/disperser/batcher"
batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock"
batmock "github.com/Layr-Labs/eigenda/disperser/batcher/mock"
Expand All @@ -41,7 +43,6 @@ type batcherComponents struct {
transactor *coremock.MockTransactor
txnManager *batchermock.MockTxnManager
blobStore disperser.BlobStore
encoderClient *disperser.LocalEncoderClient
encodingStreamer *bat.EncodingStreamer
ethClient *cmock.MockEthClient
dispatcher *dmock.Dispatcher
Expand Down Expand Up @@ -123,12 +124,18 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.

metrics := bat.NewMetrics("9100", logger)

trigger := batcher.NewEncodedSizeNotifier(
make(chan struct{}, 1),
10*1024*1024,
)
encodingWorkerPool := workerpool.New(10)
encoderClient := disperser.NewLocalEncoderClient(p)
encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, trigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger)
finalizer := batchermock.NewFinalizer()
ethClient := &cmock.MockEthClient{}
txnManager := batmock.NewTxnManager()

b, err := bat.NewBatcher(config, timeoutConfig, blobStore, dispatcher, cst, asgn, encoderClient, agg, ethClient, finalizer, transactor, txnManager, logger, metrics, handleBatchLivenessChan)
b, err := bat.NewBatcher(config, timeoutConfig, blobStore, dispatcher, cst, asgn, encodingStreamer, agg, ethClient, finalizer, transactor, txnManager, logger, metrics, handleBatchLivenessChan)
assert.NoError(t, err)

var mu sync.Mutex
Expand All @@ -153,7 +160,6 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.
transactor: transactor,
txnManager: txnManager,
blobStore: blobStore,
encoderClient: encoderClient,
encodingStreamer: b.EncodingStreamer,
ethClient: ethClient,
dispatcher: dispatcher,
Expand Down
2 changes: 1 addition & 1 deletion disperser/batcher/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func NewOrchestrator(
return nil, err
}
} else {
batcher, err = NewBatcher(config, timeoutConfig, queue, dispatcher, chainState, assignmentCoordinator, encoderClient, aggregator, ethClient, finalizer, transactor, txnManager, logger, metrics, heartbeatChan)
batcher, err = NewBatcher(config, timeoutConfig, queue, dispatcher, chainState, assignmentCoordinator, encodingStreamer, aggregator, ethClient, finalizer, transactor, txnManager, logger, metrics, heartbeatChan)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit e7c9c43

Please sign in to comment.