Skip to content

Commit

Permalink
Revert "Refactor dispatcher timeout" (#645)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Jul 18, 2024
1 parent 4f1ceba commit eaec149
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 25 deletions.
2 changes: 1 addition & 1 deletion disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
// Dispatch encoded batch
log.Debug("Dispatching encoded batch...")
stageTimer = time.Now()
update := b.Dispatcher.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, b.AttestationTimeout)
update := b.Dispatcher.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader)
log.Debug("DisperseBatch took", "duration", time.Since(stageTimer))
h, err := batch.State.OperatorState.Hash()
if err != nil {
Expand Down
23 changes: 15 additions & 8 deletions disperser/batcher/grpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,37 @@ import (
"google.golang.org/protobuf/proto"
)

type Config struct {
Timeout time.Duration
}

type dispatcher struct {
*Config

logger logging.Logger
metrics *batcher.DispatcherMetrics
}

func NewDispatcher(logger logging.Logger, metrics *batcher.DispatcherMetrics) *dispatcher {
func NewDispatcher(cfg *Config, logger logging.Logger, metrics *batcher.DispatcherMetrics) *dispatcher {
return &dispatcher{
Config: cfg,
logger: logger.With("component", "Dispatcher"),
metrics: metrics,
}
}

var _ disperser.Dispatcher = (*dispatcher)(nil)

func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, timeout time.Duration) chan core.SigningMessage {
func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader) chan core.SigningMessage {
update := make(chan core.SigningMessage, len(state.IndexedOperators))

// Disperse
c.sendAllChunks(ctx, state, blobs, batchHeader, timeout, update)
c.sendAllChunks(ctx, state, blobs, batchHeader, update)

return update
}

func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, timeout time.Duration, update chan core.SigningMessage) {
func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, update chan core.SigningMessage) {
for id, op := range state.IndexedOperators {
go func(op core.IndexedOperatorInfo, id core.OperatorID) {
blobMessages := make([]*core.BlobMessage, 0)
Expand Down Expand Up @@ -72,9 +79,7 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera
}

requestedAt := time.Now()
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
sig, err := c.SendChunksToOperator(ctxWithTimeout, blobMessages, batchHeader, &op)
sig, err := c.sendChunks(ctx, blobMessages, batchHeader, &op)
latencyMs := float64(time.Since(requestedAt).Milliseconds())
if err != nil {
update <- core.SigningMessage{
Expand Down Expand Up @@ -104,7 +109,7 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera
}
}

func (c *dispatcher) SendChunksToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) {
func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) {
// TODO Add secure Grpc

conn, err := grpc.Dial(
Expand All @@ -118,6 +123,8 @@ func (c *dispatcher) SendChunksToOperator(ctx context.Context, blobs []*core.Blo
defer conn.Close()

gc := node.NewDispersalClient(conn)
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
defer cancel()
start := time.Now()
request, totalSize, err := GetStoreChunksRequest(blobs, batchHeader)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ func RunBatcher(ctx *cli.Context) error {

metrics := batcher.NewMetrics(config.MetricsConfig.HTTPPort, logger)

dispatcher := dispatcher.NewDispatcher(logger, metrics.DispatcherMetrics)
dispatcher := dispatcher.NewDispatcher(&dispatcher.Config{
Timeout: config.TimeoutConfig.AttestationTimeout,
}, logger, metrics.DispatcherMetrics)
asgn := &core.StdAssignmentCoordinator{}

var wallet walletsdk.Wallet
Expand Down
8 changes: 1 addition & 7 deletions disperser/disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
Expand Down Expand Up @@ -177,12 +176,7 @@ type BlobStore interface {
}

type Dispatcher interface {
// DisperseBatch sends the blobs to the operators in the state and returns a channel to receive the signing messages
// Attestation timeout needs to be configured in the parameter to set the correct timeout for each dispersal request
DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader, time.Duration) chan core.SigningMessage
// SendChunksToOperator sends the blobs to the operator and returns the signature and error
// It uses the context in the parameter to send the dispersal requests
SendChunksToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error)
DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage
}

// GenerateReverseIndexKey returns the key used to store the blob key in the reverse index
Expand Down
8 changes: 1 addition & 7 deletions disperser/mock/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package mock
import (
"context"
"errors"
"time"

"github.com/Layr-Labs/eigenda/core"
coremock "github.com/Layr-Labs/eigenda/core/mock"
Expand All @@ -24,7 +23,7 @@ func NewDispatcher(state *coremock.PrivateOperatorState) *Dispatcher {
}
}

func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader, timeout time.Duration) chan core.SigningMessage {
func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader) chan core.SigningMessage {
args := d.Called()
var nonSigners map[core.OperatorID]struct{}
if args.Get(0) != nil {
Expand Down Expand Up @@ -65,8 +64,3 @@ func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera

return update
}

func (c *Dispatcher) SendChunksToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) {
args := c.Called(ctx, blobs, batchHeader, op)
return args.Get(0).(*core.Signature), args.Error(1)
}
5 changes: 4 additions & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,11 @@ type TestDisperser struct {
}

func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser.BlobStore, logger logging.Logger) TestDisperser {
dispatcherConfig := &dispatcher.Config{
Timeout: time.Second,
}
batcherMetrics := batcher.NewMetrics("9100", logger)
dispatcher := dispatcher.NewDispatcher(logger, batcherMetrics.DispatcherMetrics)
dispatcher := dispatcher.NewDispatcher(dispatcherConfig, logger, batcherMetrics.DispatcherMetrics)

transactor := &coremock.MockTransactor{}
transactor.On("OperatorIDToAddress").Return(gethcommon.Address{}, nil)
Expand Down

0 comments on commit eaec149

Please sign in to comment.