diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index f2f9028fee..1551dd0215 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -417,7 +417,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { // Dispatch encoded batch log.Debug("Dispatching encoded batch...") stageTimer = time.Now() - update := b.Dispatcher.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader) + update := b.Dispatcher.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, b.AttestationTimeout) log.Debug("DisperseBatch took", "duration", time.Since(stageTimer)) h, err := batch.State.OperatorState.Hash() if err != nil { diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 5807a6f40d..ed69af2009 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -17,20 +17,13 @@ import ( "google.golang.org/protobuf/proto" ) -type Config struct { - Timeout time.Duration -} - type dispatcher struct { - *Config - logger logging.Logger metrics *batcher.DispatcherMetrics } -func NewDispatcher(cfg *Config, logger logging.Logger, metrics *batcher.DispatcherMetrics) *dispatcher { +func NewDispatcher(logger logging.Logger, metrics *batcher.DispatcherMetrics) *dispatcher { return &dispatcher{ - Config: cfg, logger: logger.With("component", "Dispatcher"), metrics: metrics, } @@ -38,16 +31,16 @@ func NewDispatcher(cfg *Config, logger logging.Logger, metrics *batcher.Dispatch var _ disperser.Dispatcher = (*dispatcher)(nil) -func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader) chan core.SigningMessage { +func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, timeout time.Duration) chan core.SigningMessage { update := make(chan core.SigningMessage, len(state.IndexedOperators)) // Disperse - c.sendAllChunks(ctx, state, blobs, batchHeader, update) + c.sendAllChunks(ctx, state, blobs, batchHeader, timeout, update) return update } -func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, update chan core.SigningMessage) { +func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, timeout time.Duration, update chan core.SigningMessage) { for id, op := range state.IndexedOperators { go func(op core.IndexedOperatorInfo, id core.OperatorID) { blobMessages := make([]*core.BlobMessage, 0) @@ -79,7 +72,9 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera } requestedAt := time.Now() - sig, err := c.sendChunks(ctx, blobMessages, batchHeader, &op) + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + sig, err := c.SendChunksToOperator(ctx, blobMessages, batchHeader, &op) latencyMs := float64(time.Since(requestedAt).Milliseconds()) if err != nil { update <- core.SigningMessage{ @@ -109,7 +104,7 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera } } -func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { +func (c *dispatcher) SendChunksToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { // TODO Add secure Grpc conn, err := grpc.Dial( @@ -123,8 +118,6 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, defer conn.Close() gc := node.NewDispersalClient(conn) - ctx, cancel := context.WithTimeout(ctx, c.Timeout) - defer cancel() start := time.Now() request, totalSize, err := GetStoreChunksRequest(blobs, batchHeader) if err != nil { diff --git a/disperser/cmd/batcher/main.go b/disperser/cmd/batcher/main.go index 49868b4a3c..4d417559f5 100644 --- a/disperser/cmd/batcher/main.go +++ b/disperser/cmd/batcher/main.go @@ -98,9 +98,7 @@ func RunBatcher(ctx *cli.Context) error { metrics := batcher.NewMetrics(config.MetricsConfig.HTTPPort, logger) - dispatcher := dispatcher.NewDispatcher(&dispatcher.Config{ - Timeout: config.TimeoutConfig.AttestationTimeout, - }, logger, metrics.DispatcherMetrics) + dispatcher := dispatcher.NewDispatcher(logger, metrics.DispatcherMetrics) asgn := &core.StdAssignmentCoordinator{} var wallet walletsdk.Wallet diff --git a/disperser/disperser.go b/disperser/disperser.go index 78d078e594..9226e3db56 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "strings" + "time" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/core" @@ -176,7 +177,12 @@ type BlobStore interface { } type Dispatcher interface { - DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage + // DisperseBatch sends the blobs to the operators in the state and returns a channel to receive the signing messages + // Attestation timeout needs to be configured in the parameter to set the correct timeout for each dispersal request + DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader, time.Duration) chan core.SigningMessage + // SendChunksToOperator sends the blobs to the operator and returns the signature and error + // It uses the context in the parameter to send the dispersal requests + SendChunksToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) } // GenerateReverseIndexKey returns the key used to store the blob key in the reverse index diff --git a/disperser/mock/dispatcher.go b/disperser/mock/dispatcher.go index 59d13f686e..5e19a8a8f7 100644 --- a/disperser/mock/dispatcher.go +++ b/disperser/mock/dispatcher.go @@ -3,6 +3,7 @@ package mock import ( "context" "errors" + "time" "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" @@ -23,7 +24,7 @@ func NewDispatcher(state *coremock.PrivateOperatorState) *Dispatcher { } } -func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader) chan core.SigningMessage { +func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader, timeout time.Duration) chan core.SigningMessage { args := d.Called() var nonSigners map[core.OperatorID]struct{} if args.Get(0) != nil { @@ -64,3 +65,8 @@ func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera return update } + +func (c *Dispatcher) SendChunksToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { + args := c.Called(ctx, blobs, batchHeader, op) + return args.Get(0).(*core.Signature), args.Error(1) +} diff --git a/test/integration_test.go b/test/integration_test.go index a1fc47c04b..0d9e8786af 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -129,11 +129,8 @@ type TestDisperser struct { } func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser.BlobStore, logger logging.Logger) TestDisperser { - dispatcherConfig := &dispatcher.Config{ - Timeout: time.Second, - } batcherMetrics := batcher.NewMetrics("9100", logger) - dispatcher := dispatcher.NewDispatcher(dispatcherConfig, logger, batcherMetrics.DispatcherMetrics) + dispatcher := dispatcher.NewDispatcher(logger, batcherMetrics.DispatcherMetrics) transactor := &coremock.MockTransactor{} transactor.On("OperatorIDToAddress").Return(gethcommon.Address{}, nil)