Skip to content

Commit

Permalink
Revert batcher to original
Browse files Browse the repository at this point in the history
Move minibatcher branch logic to main
  • Loading branch information
pschork committed Aug 20, 2024
1 parent e7c9c43 commit 980ac9d
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 108 deletions.
82 changes: 76 additions & 6 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,73 @@ import (
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/core/types"
"github.com/gammazero/workerpool"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"github.com/prometheus/client_golang/prometheus"
"github.com/wealdtech/go-merkletree/v2"
)

const (
QuantizationFactor = uint(1)
indexerWarmupDelay = 2 * time.Second
)

type BatchPlan struct {
IncludedBlobs []*disperser.BlobMetadata
Quorums map[core.QuorumID]QuorumInfo
State *core.IndexedOperatorState
}

type QuorumInfo struct {
Assignments map[core.OperatorID]core.Assignment
Info core.AssignmentInfo
QuantizationFactor uint
}

type TimeoutConfig struct {
EncodingTimeout time.Duration
AttestationTimeout time.Duration
ChainReadTimeout time.Duration
ChainWriteTimeout time.Duration
ChainStateTimeout time.Duration
TxnBroadcastTimeout time.Duration
}

type Config struct {
PullInterval time.Duration
FinalizerInterval time.Duration
FinalizerPoolSize int
EncoderSocket string
SRSOrder int
NumConnections int
EncodingRequestQueueSize int
// BatchSizeMBLimit is the maximum size of a batch in MB
BatchSizeMBLimit uint
MaxNumRetriesPerBlob uint

FinalizationBlockDelay uint

TargetNumChunks uint
MaxBlobsToFetchFromStore int

EnableMinibatch bool
BatchstoreTableName string
MinibatcherConfig MinibatcherConfig
}

type Batcher struct {
Config
TimeoutConfig

Queue disperser.BlobStore
Dispatcher disperser.Dispatcher
Queue disperser.BlobStore
Dispatcher disperser.Dispatcher
EncoderClient disperser.EncoderClient

ChainState core.IndexedChainState
AssignmentCoordinator core.AssignmentCoordinator
EncodingStreamer *EncodingStreamer
Aggregator core.SignatureAggregator
EncodingStreamer *EncodingStreamer
Transactor core.Transactor
TransactionManager TxnManager
Metrics *Metrics
Expand All @@ -49,7 +99,7 @@ func NewBatcher(
dispatcher disperser.Dispatcher,
chainState core.IndexedChainState,
assignmentCoordinator core.AssignmentCoordinator,
encodingStreamer *EncodingStreamer,
encoderClient disperser.EncoderClient,
aggregator core.SignatureAggregator,
ethClient common.EthClient,
finalizer Finalizer,
Expand All @@ -59,12 +109,32 @@ func NewBatcher(
metrics *Metrics,
heartbeatChan chan time.Time,
) (*Batcher, error) {
batchTrigger := NewEncodedSizeNotifier(
make(chan struct{}, 1),
uint64(config.BatchSizeMBLimit)*1024*1024, // convert to bytes
)
streamerConfig := StreamerConfig{
SRSOrder: config.SRSOrder,
EncodingRequestTimeout: config.PullInterval,
EncodingQueueLimit: config.EncodingRequestQueueSize,
TargetNumChunks: config.TargetNumChunks,
MaxBlobsToFetchFromStore: config.MaxBlobsToFetchFromStore,
FinalizationBlockDelay: config.FinalizationBlockDelay,
ChainStateTimeout: timeoutConfig.ChainStateTimeout,
}
encodingWorkerPool := workerpool.New(config.NumConnections)
encodingStreamer, err := NewEncodingStreamer(streamerConfig, queue, chainState, encoderClient, assignmentCoordinator, batchTrigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger)
if err != nil {
return nil, err
}

return &Batcher{
Config: config,
TimeoutConfig: timeoutConfig,

Queue: queue,
Dispatcher: dispatcher,
Queue: queue,
Dispatcher: dispatcher,
EncoderClient: encoderClient,

ChainState: chainState,
AssignmentCoordinator: assignmentCoordinator,
Expand Down
10 changes: 1 addition & 9 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@ import (

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/gammazero/workerpool"

cmock "github.com/Layr-Labs/eigenda/common/mock"
"github.com/Layr-Labs/eigenda/core"
coremock "github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/batcher"
bat "github.com/Layr-Labs/eigenda/disperser/batcher"
batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock"
batmock "github.com/Layr-Labs/eigenda/disperser/batcher/mock"
Expand Down Expand Up @@ -124,18 +122,12 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.

metrics := bat.NewMetrics("9100", logger)

trigger := batcher.NewEncodedSizeNotifier(
make(chan struct{}, 1),
10*1024*1024,
)
encodingWorkerPool := workerpool.New(10)
encoderClient := disperser.NewLocalEncoderClient(p)
encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, trigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger)
finalizer := batchermock.NewFinalizer()
ethClient := &cmock.MockEthClient{}
txnManager := batmock.NewTxnManager()

b, err := bat.NewBatcher(config, timeoutConfig, blobStore, dispatcher, cst, asgn, encodingStreamer, agg, ethClient, finalizer, transactor, txnManager, logger, metrics, handleBatchLivenessChan)
b, err := bat.NewBatcher(config, timeoutConfig, blobStore, dispatcher, cst, asgn, encoderClient, agg, ethClient, finalizer, transactor, txnManager, logger, metrics, handleBatchLivenessChan)
assert.NoError(t, err)

var mu sync.Mutex
Expand Down
4 changes: 0 additions & 4 deletions disperser/batcher/batchstore/minibatch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ const (
blobMinibatchMappingSKPrefix = "BLOB_MINIBATCH_MAPPING#"
)

type Config struct {
TableName string
}

type MinibatchStore struct {
dynamoDBClient *commondynamodb.Client
tableName string
Expand Down
6 changes: 0 additions & 6 deletions disperser/batcher/minibatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type MinibatcherConfig struct {
MaxNumConnections uint
MaxNumRetriesPerBlob uint
MaxNumRetriesPerDispersal uint
BatchstoreTableName string
}

type BatchState struct {
Expand Down Expand Up @@ -158,11 +157,6 @@ func (b *Minibatcher) RecoverState(ctx context.Context) error {
return fmt.Errorf("minibatch state recovery not implemented")
}

func (b *Minibatcher) ProcessConfirmedMinibatch(ctx context.Context, receiptOrErr *ReceiptOrErr) error {
//TODO: Implement process confirmed minibatch
return fmt.Errorf("process confirmed minibatch not implemented")
}

func (b *Minibatcher) PopBatchState(batchID uuid.UUID) *BatchState {
batchState, ok := b.Batches[batchID]
if !ok {
Expand Down
75 changes: 6 additions & 69 deletions disperser/batcher/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,52 +11,6 @@ import (
"github.com/gammazero/workerpool"
)

const (
QuantizationFactor = uint(1)
indexerWarmupDelay = 2 * time.Second
)

type BatchPlan struct {
IncludedBlobs []*disperser.BlobMetadata
Quorums map[core.QuorumID]QuorumInfo
State *core.IndexedOperatorState
}

type QuorumInfo struct {
Assignments map[core.OperatorID]core.Assignment
Info core.AssignmentInfo
QuantizationFactor uint
}

type TimeoutConfig struct {
EncodingTimeout time.Duration
AttestationTimeout time.Duration
ChainReadTimeout time.Duration
ChainWriteTimeout time.Duration
ChainStateTimeout time.Duration
TxnBroadcastTimeout time.Duration
}

type Config struct {
PullInterval time.Duration
FinalizerInterval time.Duration
FinalizerPoolSize int
EncoderSocket string
SRSOrder int
NumConnections int
EncodingRequestQueueSize int
// BatchSizeMBLimit is the maximum size of a batch in MB
BatchSizeMBLimit uint
MaxNumRetriesPerBlob uint

FinalizationBlockDelay uint

TargetNumChunks uint
MaxBlobsToFetchFromStore int
EnableMinibatch bool
MinibatcherConfig MinibatcherConfig
}

type Orchestrator struct {
Config
TimeoutConfig
Expand All @@ -79,7 +33,6 @@ type Orchestrator struct {
finalizer Finalizer
logger logging.Logger

Batcher *Batcher
MiniBatcher *Minibatcher
}

Expand Down Expand Up @@ -120,18 +73,10 @@ func NewOrchestrator(
return nil, err
}

var batcher *Batcher
var miniBatcher *Minibatcher
if config.EnableMinibatch {
miniBatcher, err = NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, chainState, assignmentCoordinator, encodingStreamer, ethClient, finalizer, encodingWorkerPool, logger, metrics, heartbeatChan)
if err != nil {
return nil, err
}
} else {
batcher, err = NewBatcher(config, timeoutConfig, queue, dispatcher, chainState, assignmentCoordinator, encodingStreamer, aggregator, ethClient, finalizer, transactor, txnManager, logger, metrics, heartbeatChan)
if err != nil {
return nil, err
}
miniBatcher, err = NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, chainState, assignmentCoordinator, encodingStreamer, ethClient, finalizer, encodingWorkerPool, logger, metrics, heartbeatChan)
if err != nil {
return nil, err
}

return &Orchestrator{
Expand All @@ -155,22 +100,14 @@ func NewOrchestrator(
finalizer: finalizer,
logger: logger.With("component", "Orchestrator"),
HeartbeatChan: heartbeatChan,
Batcher: batcher,
MiniBatcher: miniBatcher,
}, nil
}

func (o *Orchestrator) Start(ctx context.Context) error {
if o.Config.EnableMinibatch {
err := o.MiniBatcher.Start(ctx)
if err != nil {
return err
}
} else {
err := o.Batcher.Start(ctx)
if err != nil {
return err
}
err := o.MiniBatcher.Start(ctx)
if err != nil {
return err
}
return nil
}
2 changes: 1 addition & 1 deletion disperser/cmd/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func NewConfig(ctx *cli.Context) (Config, error) {
MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name),
FinalizationBlockDelay: ctx.GlobalUint(flags.FinalizationBlockDelayFlag.Name),
EnableMinibatch: ctx.Bool(flags.EnableMinibatchFlag.Name),
BatchstoreTableName: ctx.GlobalString(flags.BatchstoreTableNameFlag.Name),
MinibatcherConfig: batcher.MinibatcherConfig{
PullInterval: ctx.GlobalDuration(flags.MinibatcherPullIntervalFlag.Name),
MaxNumConnections: ctx.GlobalUint(flags.MaxNodeConnectionsFlag.Name),
MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name),
MaxNumRetriesPerDispersal: ctx.GlobalUint(flags.MaxNumRetriesPerDispersalFlag.Name),
BatchstoreTableName: ctx.GlobalString(flags.BatchstoreTableNameFlag.Name),
},
},
TimeoutConfig: batcher.TimeoutConfig{
Expand Down
32 changes: 19 additions & 13 deletions disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func RunBatcher(ctx *cli.Context) error {
return err
}

if config.BatcherConfig.EnableMinibatch && config.BatcherConfig.MinibatcherConfig.BatchstoreTableName == "" {
if config.BatcherConfig.EnableMinibatch && config.BatcherConfig.BatchstoreTableName == "" {
return errors.New("minibatch store table name required when minibatch is enabled")
}

Expand Down Expand Up @@ -244,19 +244,25 @@ func RunBatcher(ctx *cli.Context) error {
logger.Info("Enabled metrics for Batcher", "socket", httpSocket)
}

var minibatchStore batcher.MinibatchStore
if config.BatcherConfig.EnableMinibatch {
minibatchStore = batchstore.NewMinibatchStore(dynamoClient, logger, config.BatcherConfig.MinibatcherConfig.BatchstoreTableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
}

orchestrator, err := batcher.NewOrchestrator(config.BatcherConfig, config.TimeoutConfig, queue, minibatchStore, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan)
if err != nil {
return err
}

err = orchestrator.Start(context.Background())
if err != nil {
return err
minibatchStore := batchstore.NewMinibatchStore(dynamoClient, logger, config.BatcherConfig.BatchstoreTableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
orchestrator, err := batcher.NewOrchestrator(config.BatcherConfig, config.TimeoutConfig, queue, minibatchStore, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan)
if err != nil {
return err
}
err = orchestrator.Start(context.Background())
if err != nil {
return err
}
} else {
batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan)
if err != nil {
return err
}
err = batcher.Start(context.Background())
if err != nil {
return err
}
}

// Signal readiness
Expand Down

0 comments on commit 980ac9d

Please sign in to comment.