Skip to content

Commit

Permalink
Add finalizer
Browse files Browse the repository at this point in the history
  • Loading branch information
pschork committed Aug 20, 2024
1 parent 5c37123 commit f8678a7
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 10 deletions.
3 changes: 2 additions & 1 deletion disperser/batcher/batch_confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents {
dispatcher := dmock.NewDispatcher(state)
blobStore := inmem.NewBlobStore()
ethClient := &cmock.MockEthClient{}
finalizer := batmock.NewFinalizer()
txnManager := batmock.NewTxnManager()
minibatchStore := batcherinmem.NewMinibatchStore(logger)
encodingWorkerPool := workerpool.New(10)
Expand All @@ -76,7 +77,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents {
MaxNumConnections: 10,
MaxNumRetriesPerBlob: 2,
MaxNumRetriesPerDispersal: 1,
}, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, ethClient, pool, logger, metrics, handleMinibatchLivenessChan)
}, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, ethClient, finalizer, pool, logger, metrics, handleMinibatchLivenessChan)
assert.NoError(t, err)

config := bat.BatchConfirmerConfig{
Expand Down
28 changes: 27 additions & 1 deletion disperser/batcher/minibatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Minibatcher struct {

Metrics *Metrics
ethClient common.EthClient
finalizer Finalizer
logger logging.Logger
}

Expand All @@ -62,6 +63,7 @@ func NewMinibatcher(
assignmentCoordinator core.AssignmentCoordinator,
encodingStreamer *EncodingStreamer,
ethClient common.EthClient,
finalizer Finalizer,
workerpool common.WorkerPool,
logger logging.Logger,
metrics *Metrics,
Expand All @@ -83,6 +85,7 @@ func NewMinibatcher(
MinibatchIndex: 0,

ethClient: ethClient,
finalizer: finalizer,
logger: logger.With("component", "Minibatcher"),
HeartbeatChan: heartbeatChan,
Metrics: metrics,
Expand All @@ -94,9 +97,18 @@ func (b *Minibatcher) Start(ctx context.Context) error {
if err != nil {
return err
}

// Wait for few seconds for indexer to index blockchain
// This won't be needed when we switch to using Graph node
time.Sleep(indexerWarmupDelay)
err = b.EncodingStreamer.Start(ctx)
if err != nil {
return err
}
batchTrigger := b.EncodingStreamer.EncodedSizeNotifier

b.finalizer.Start(ctx)

go func() {
ticker := time.NewTicker(b.PullInterval)
defer ticker.Stop()
Expand All @@ -120,10 +132,24 @@ func (b *Minibatcher) Start(ctx context.Context) error {
if cancel != nil {
cancelFuncs = append(cancelFuncs, cancel)
}
case <-batchTrigger.Notify:
ticker.Stop()

cancel, err := b.HandleSingleMinibatch(ctx)
if err != nil {
if errors.Is(err, errNoEncodedResults) {
b.logger.Warn("no encoded results to construct minibatch")
} else {
b.logger.Error("failed to process minibatch", "err", err)
}
}
if cancel != nil {
cancelFuncs = append(cancelFuncs, cancel)
}
ticker.Reset(b.PullInterval)
}
}
}()

return nil
}

Expand Down
7 changes: 5 additions & 2 deletions disperser/batcher/minibatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/batcher"
"github.com/Layr-Labs/eigenda/disperser/batcher/inmem"
batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock"
dinmem "github.com/Layr-Labs/eigenda/disperser/common/inmem"
dmock "github.com/Layr-Labs/eigenda/disperser/mock"
"github.com/Layr-Labs/eigensdk-go/logging"
Expand Down Expand Up @@ -83,6 +84,7 @@ func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcher
p, err := makeTestProver()
assert.NoError(t, err)
encoderClient := disperser.NewLocalEncoderClient(p)
finalizer := batchermock.NewFinalizer()
asgn := &core.StdAssignmentCoordinator{}
chainState.On("GetCurrentBlockNumber").Return(initialBlock, nil)
metrics := batcher.NewMetrics("9100", logger)
Expand All @@ -94,7 +96,7 @@ func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcher
assert.NoError(t, err)
ethClient := &cmock.MockEthClient{}
pool := workerpool.New(int(config.MaxNumConnections))
m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, chainState, asgn, encodingStreamer, ethClient, pool, logger, metrics, handleMinibatchLivenessChan)
m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, chainState, asgn, encodingStreamer, ethClient, finalizer, pool, logger, metrics, handleMinibatchLivenessChan)
assert.NoError(t, err)
ics, err := chainState.GetIndexedOperatorState(context.Background(), 0, []core.QuorumID{0, 1})
assert.NoError(t, err)
Expand Down Expand Up @@ -529,7 +531,8 @@ func TestMinibatcherTooManyPendingRequests(t *testing.T) {
ctx := context.Background()
mockWorkerPool := &cmock.MockWorkerpool{}
// minibatcher with mock worker pool
m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.minibatcher.ChainState, c.assignmentCoordinator, c.encodingStreamer, c.ethClient, mockWorkerPool, c.logger, c.metrics, handleMinibatchLivenessChan)
finalizer := batchermock.NewFinalizer()
m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.minibatcher.ChainState, c.assignmentCoordinator, c.encodingStreamer, c.ethClient, finalizer, mockWorkerPool, c.logger, c.metrics, handleMinibatchLivenessChan)
assert.NoError(t, err)
mockWorkerPool.On("WaitingQueueSize").Return(int(defaultConfig.MaxNumConnections + 1)).Once()
_, err = m.HandleSingleMinibatch(ctx)
Expand Down
2 changes: 1 addition & 1 deletion disperser/batcher/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func NewOrchestrator(
var batcher *Batcher
var miniBatcher *Minibatcher
if config.EnableMinibatch {
miniBatcher, err = NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, chainState, assignmentCoordinator, encodingStreamer, ethClient, encodingWorkerPool, logger, metrics, heartbeatChan)
miniBatcher, err = NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, chainState, assignmentCoordinator, encodingStreamer, ethClient, finalizer, encodingWorkerPool, logger, metrics, heartbeatChan)
if err != nil {
return nil, err
}
Expand Down
5 changes: 0 additions & 5 deletions disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,6 @@ func RunBatcher(ctx *cli.Context) error {
minibatchStore = batchstore.NewMinibatchStore(dynamoClient, logger, config.BatcherConfig.MinibatcherConfig.BatchstoreTableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
}

//batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan)
//if err != nil {
// return err
//}
//batcher.Start(context.Background())
orchestrator, err := batcher.NewOrchestrator(config.BatcherConfig, config.TimeoutConfig, queue, minibatchStore, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan)
if err != nil {
return err
Expand Down

0 comments on commit f8678a7

Please sign in to comment.