diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 1551dd0215..f2f9028fee 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -417,7 +417,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { // Dispatch encoded batch log.Debug("Dispatching encoded batch...") stageTimer = time.Now() - update := b.Dispatcher.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, b.AttestationTimeout) + update := b.Dispatcher.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader) log.Debug("DisperseBatch took", "duration", time.Since(stageTimer)) h, err := batch.State.OperatorState.Hash() if err != nil { diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index f2107e7311..5807a6f40d 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -17,13 +17,20 @@ import ( "google.golang.org/protobuf/proto" ) +type Config struct { + Timeout time.Duration +} + type dispatcher struct { + *Config + logger logging.Logger metrics *batcher.DispatcherMetrics } -func NewDispatcher(logger logging.Logger, metrics *batcher.DispatcherMetrics) *dispatcher { +func NewDispatcher(cfg *Config, logger logging.Logger, metrics *batcher.DispatcherMetrics) *dispatcher { return &dispatcher{ + Config: cfg, logger: logger.With("component", "Dispatcher"), metrics: metrics, } @@ -31,16 +38,16 @@ func NewDispatcher(logger logging.Logger, metrics *batcher.DispatcherMetrics) *d var _ disperser.Dispatcher = (*dispatcher)(nil) -func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, timeout time.Duration) chan core.SigningMessage { +func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader) chan core.SigningMessage { update := make(chan core.SigningMessage, len(state.IndexedOperators)) // Disperse - c.sendAllChunks(ctx, state, blobs, batchHeader, timeout, update) + c.sendAllChunks(ctx, state, blobs, batchHeader, update) return update } -func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, timeout time.Duration, update chan core.SigningMessage) { +func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, update chan core.SigningMessage) { for id, op := range state.IndexedOperators { go func(op core.IndexedOperatorInfo, id core.OperatorID) { blobMessages := make([]*core.BlobMessage, 0) @@ -72,9 +79,7 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera } requestedAt := time.Now() - ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - sig, err := c.SendChunksToOperator(ctxWithTimeout, blobMessages, batchHeader, &op) + sig, err := c.sendChunks(ctx, blobMessages, batchHeader, &op) latencyMs := float64(time.Since(requestedAt).Milliseconds()) if err != nil { update <- core.SigningMessage{ @@ -104,7 +109,7 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera } } -func (c *dispatcher) SendChunksToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { +func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { // TODO Add secure Grpc conn, err := grpc.Dial( @@ -118,6 +123,8 @@ func (c *dispatcher) SendChunksToOperator(ctx context.Context, blobs []*core.Blo defer conn.Close() gc := node.NewDispersalClient(conn) + ctx, cancel := context.WithTimeout(ctx, c.Timeout) + defer cancel() start := time.Now() request, totalSize, err := GetStoreChunksRequest(blobs, batchHeader) if err != nil { diff --git a/disperser/cmd/batcher/main.go b/disperser/cmd/batcher/main.go index 4d417559f5..49868b4a3c 100644 --- a/disperser/cmd/batcher/main.go +++ b/disperser/cmd/batcher/main.go @@ -98,7 +98,9 @@ func RunBatcher(ctx *cli.Context) error { metrics := batcher.NewMetrics(config.MetricsConfig.HTTPPort, logger) - dispatcher := dispatcher.NewDispatcher(logger, metrics.DispatcherMetrics) + dispatcher := dispatcher.NewDispatcher(&dispatcher.Config{ + Timeout: config.TimeoutConfig.AttestationTimeout, + }, logger, metrics.DispatcherMetrics) asgn := &core.StdAssignmentCoordinator{} var wallet walletsdk.Wallet diff --git a/disperser/disperser.go b/disperser/disperser.go index 9226e3db56..78d078e594 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "strings" - "time" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/core" @@ -177,12 +176,7 @@ type BlobStore interface { } type Dispatcher interface { - // DisperseBatch sends the blobs to the operators in the state and returns a channel to receive the signing messages - // Attestation timeout needs to be configured in the parameter to set the correct timeout for each dispersal request - DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader, time.Duration) chan core.SigningMessage - // SendChunksToOperator sends the blobs to the operator and returns the signature and error - // It uses the context in the parameter to send the dispersal requests - SendChunksToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) + DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage } // GenerateReverseIndexKey returns the key used to store the blob key in the reverse index diff --git a/disperser/mock/dispatcher.go b/disperser/mock/dispatcher.go index 5e19a8a8f7..59d13f686e 100644 --- a/disperser/mock/dispatcher.go +++ b/disperser/mock/dispatcher.go @@ -3,7 +3,6 @@ package mock import ( "context" "errors" - "time" "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" @@ -24,7 +23,7 @@ func NewDispatcher(state *coremock.PrivateOperatorState) *Dispatcher { } } -func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader, timeout time.Duration) chan core.SigningMessage { +func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader) chan core.SigningMessage { args := d.Called() var nonSigners map[core.OperatorID]struct{} if args.Get(0) != nil { @@ -65,8 +64,3 @@ func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera return update } - -func (c *Dispatcher) SendChunksToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { - args := c.Called(ctx, blobs, batchHeader, op) - return args.Get(0).(*core.Signature), args.Error(1) -} diff --git a/test/integration_test.go b/test/integration_test.go index 0d9e8786af..a1fc47c04b 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -129,8 +129,11 @@ type TestDisperser struct { } func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser.BlobStore, logger logging.Logger) TestDisperser { + dispatcherConfig := &dispatcher.Config{ + Timeout: time.Second, + } batcherMetrics := batcher.NewMetrics("9100", logger) - dispatcher := dispatcher.NewDispatcher(logger, batcherMetrics.DispatcherMetrics) + dispatcher := dispatcher.NewDispatcher(dispatcherConfig, logger, batcherMetrics.DispatcherMetrics) transactor := &coremock.MockTransactor{} transactor.On("OperatorIDToAddress").Return(gethcommon.Address{}, nil)