diff --git a/api/grpc/mock/node_disperser_client.go b/api/grpc/mock/node_disperser_client.go new file mode 100644 index 0000000000..a3d296dd21 --- /dev/null +++ b/api/grpc/mock/node_disperser_client.go @@ -0,0 +1,39 @@ +package mock + +import ( + "context" + + "github.com/Layr-Labs/eigenda/api/grpc/node" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc" +) + +type MockNodeDispersalClient struct { + mock.Mock +} + +var _ node.DispersalClient = (*MockNodeDispersalClient)(nil) + +func NewMockDispersalClient() *MockNodeDispersalClient { + return &MockNodeDispersalClient{} +} + +func (m *MockNodeDispersalClient) StoreChunks(ctx context.Context, in *node.StoreChunksRequest, opts ...grpc.CallOption) (*node.StoreChunksReply, error) { + args := m.Called() + return args.Get(0).(*node.StoreChunksReply), args.Error(1) +} + +func (m *MockNodeDispersalClient) StoreBlobs(ctx context.Context, in *node.StoreBlobsRequest, opts ...grpc.CallOption) (*node.StoreBlobsReply, error) { + args := m.Called() + return args.Get(0).(*node.StoreBlobsReply), args.Error(1) +} + +func (m *MockNodeDispersalClient) AttestBatch(ctx context.Context, in *node.AttestBatchRequest, opts ...grpc.CallOption) (*node.AttestBatchReply, error) { + args := m.Called() + return args.Get(0).(*node.AttestBatchReply), args.Error(1) +} + +func (m *MockNodeDispersalClient) NodeInfo(ctx context.Context, in *node.NodeInfoRequest, opts ...grpc.CallOption) (*node.NodeInfoReply, error) { + args := m.Called() + return args.Get(0).(*node.NodeInfoReply), args.Error(1) +} diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 9c14a433b8..99399ae25f 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -3,6 +3,7 @@ package dispatcher import ( "context" "errors" + "fmt" "time" commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" @@ -196,6 +197,88 @@ func (c *dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.Blob return signatures, nil } +func (c *dispatcher) AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) { + batchHeaderHash, err := batchHeader.GetBatchHeaderHash() + if err != nil { + return nil, err + } + responseChan := make(chan core.SigningMessage, len(state.IndexedOperators)) + + for id, op := range state.IndexedOperators { + go func(op core.IndexedOperatorInfo, id core.OperatorID) { + conn, err := grpc.Dial( + core.OperatorSocket(op.Socket).GetDispersalSocket(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + c.logger.Error("disperser cannot connect to operator dispersal socket", "socket", core.OperatorSocket(op.Socket).GetDispersalSocket(), "err", err) + return + } + defer conn.Close() + + nodeClient := node.NewDispersalClient(conn) + + requestedAt := time.Now() + sig, err := c.SendAttestBatchRequest(ctx, nodeClient, blobHeaderHashes, batchHeader, &op) + latencyMs := float64(time.Since(requestedAt).Milliseconds()) + if err != nil { + responseChan <- core.SigningMessage{ + Err: err, + Signature: nil, + Operator: id, + BatchHeaderHash: batchHeaderHash, + AttestationLatencyMs: latencyMs, + } + c.metrics.ObserveLatency(id.Hex(), false, latencyMs) + } else { + responseChan <- core.SigningMessage{ + Signature: sig, + Operator: id, + BatchHeaderHash: batchHeaderHash, + AttestationLatencyMs: latencyMs, + Err: nil, + } + c.metrics.ObserveLatency(id.Hex(), true, latencyMs) + } + }(core.IndexedOperatorInfo{ + PubkeyG1: op.PubkeyG1, + PubkeyG2: op.PubkeyG2, + Socket: op.Socket, + }, id) + } + + return responseChan, nil +} + +func (c *dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { + ctx, cancel := context.WithTimeout(ctx, c.Timeout) + defer cancel() + // start := time.Now() + hashes := make([][]byte, len(blobHeaderHashes)) + for i, hash := range blobHeaderHashes { + hashes[i] = hash[:] + } + + request := &node.AttestBatchRequest{ + BatchHeader: getBatchHeaderMessage(batchHeader), + BlobHeaderHashes: hashes, + } + + c.logger.Debug("sending AttestBatch request to operator", "operator", op.Socket, "numBlobs", len(blobHeaderHashes), "requestMessageSize", proto.Size(request), "referenceBlockNumber", batchHeader.ReferenceBlockNumber) + opt := grpc.MaxCallSendMsgSize(60 * 1024 * 1024 * 1024) + reply, err := nodeDispersalClient.AttestBatch(ctx, request, opt) + if err != nil { + return nil, fmt.Errorf("failed to send AttestBatch request to operator %s: %w", core.OperatorSocket(op.Socket).GetDispersalSocket(), err) + } + + sigBytes := reply.GetSignature() + point, err := new(core.Signature).Deserialize(sigBytes) + if err != nil { + return nil, fmt.Errorf("failed to deserialize signature: %w", err) + } + return &core.Signature{G1Point: point}, nil +} + func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader) (*node.StoreChunksRequest, int64, error) { blobs := make([]*node.Blob, len(blobMessages)) totalSize := int64(0) diff --git a/disperser/batcher/grpc/dispatcher_test.go b/disperser/batcher/grpc/dispatcher_test.go new file mode 100644 index 0000000000..0a6ea539f2 --- /dev/null +++ b/disperser/batcher/grpc/dispatcher_test.go @@ -0,0 +1,59 @@ +package dispatcher_test + +import ( + "context" + "math/big" + "testing" + "time" + + grpcMock "github.com/Layr-Labs/eigenda/api/grpc/mock" + "github.com/Layr-Labs/eigenda/api/grpc/node" + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/disperser/batcher" + dispatcher "github.com/Layr-Labs/eigenda/disperser/batcher/grpc" + "github.com/consensys/gnark-crypto/ecc/bn254" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/stretchr/testify/assert" +) + +func newDispatcher(t *testing.T, config *dispatcher.Config) disperser.Dispatcher { + loggerConfig := common.DefaultLoggerConfig() + logger, err := common.NewLogger(loggerConfig) + assert.NoError(t, err) + metrics := batcher.NewMetrics("9091", logger) + return dispatcher.NewDispatcher(config, logger, metrics.DispatcherMetrics) +} + +func TestSendAttestBatchRequest(t *testing.T) { + dispatcher := newDispatcher(t, &dispatcher.Config{ + Timeout: 5 * time.Second, + }) + nodeClient := grpcMock.NewMockDispersalClient() + var X, Y fp.Element + X = *X.SetBigInt(big.NewInt(1)) + Y = *Y.SetBigInt(big.NewInt(2)) + signature := &core.Signature{ + G1Point: &core.G1Point{ + G1Affine: &bn254.G1Affine{ + X: X, + Y: Y, + }, + }, + } + sigBytes := signature.Bytes() + nodeClient.On("AttestBatch").Return(&node.AttestBatchReply{ + Signature: sigBytes[:], + }, nil) + sigReply, err := dispatcher.SendAttestBatchRequest(context.Background(), nodeClient, [][32]byte{{1}}, &core.BatchHeader{ + ReferenceBlockNumber: 10, + BatchRoot: [32]byte{1}, + }, &core.IndexedOperatorInfo{ + PubkeyG1: nil, + PubkeyG2: nil, + Socket: "localhost:8080", + }) + assert.NoError(t, err) + assert.Equal(t, signature, sigReply) +} diff --git a/disperser/disperser.go b/disperser/disperser.go index f9ea58782c..8603b4abfa 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -13,6 +13,7 @@ import ( "github.com/Layr-Labs/eigenda/encoding" disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" + "github.com/Layr-Labs/eigenda/api/grpc/node" gcommon "github.com/ethereum/go-ethereum/common" ) @@ -178,6 +179,8 @@ type BlobStore interface { type Dispatcher interface { DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage SendBlobsToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) + AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) + SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) } // GenerateReverseIndexKey returns the key used to store the blob key in the reverse index diff --git a/disperser/mock/dispatcher.go b/disperser/mock/dispatcher.go index 8569e9779b..743de8ebd9 100644 --- a/disperser/mock/dispatcher.go +++ b/disperser/mock/dispatcher.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "github.com/Layr-Labs/eigenda/api/grpc/node" "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/disperser" @@ -72,3 +73,19 @@ func (d *Dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.Blob } return args.Get(0).([]*core.Signature), args.Error(1) } + +func (d *Dispatcher) AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) { + args := d.Called(ctx, state, blobHeaderHashes, batchHeader) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(chan core.SigningMessage), args.Error(1) +} + +func (d *Dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { + args := d.Called(ctx, nodeDispersalClient, blobHeaderHashes, batchHeader, op) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*core.Signature), args.Error(1) +}