Skip to content

Commit

Permalink
[dispatcher] Add method to send AttestBatch request to nodes (#664)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Jul 26, 2024
1 parent 6bc5049 commit 58a2469
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 0 deletions.
39 changes: 39 additions & 0 deletions api/grpc/mock/node_disperser_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package mock

import (
"context"

"github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc"
)

type MockNodeDispersalClient struct {
mock.Mock
}

var _ node.DispersalClient = (*MockNodeDispersalClient)(nil)

func NewMockDispersalClient() *MockNodeDispersalClient {
return &MockNodeDispersalClient{}
}

func (m *MockNodeDispersalClient) StoreChunks(ctx context.Context, in *node.StoreChunksRequest, opts ...grpc.CallOption) (*node.StoreChunksReply, error) {
args := m.Called()
return args.Get(0).(*node.StoreChunksReply), args.Error(1)
}

func (m *MockNodeDispersalClient) StoreBlobs(ctx context.Context, in *node.StoreBlobsRequest, opts ...grpc.CallOption) (*node.StoreBlobsReply, error) {
args := m.Called()
return args.Get(0).(*node.StoreBlobsReply), args.Error(1)
}

func (m *MockNodeDispersalClient) AttestBatch(ctx context.Context, in *node.AttestBatchRequest, opts ...grpc.CallOption) (*node.AttestBatchReply, error) {
args := m.Called()
return args.Get(0).(*node.AttestBatchReply), args.Error(1)
}

func (m *MockNodeDispersalClient) NodeInfo(ctx context.Context, in *node.NodeInfoRequest, opts ...grpc.CallOption) (*node.NodeInfoReply, error) {
args := m.Called()
return args.Get(0).(*node.NodeInfoReply), args.Error(1)
}
83 changes: 83 additions & 0 deletions disperser/batcher/grpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dispatcher
import (
"context"
"errors"
"fmt"
"time"

commonpb "github.com/Layr-Labs/eigenda/api/grpc/common"
Expand Down Expand Up @@ -196,6 +197,88 @@ func (c *dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.Blob
return signatures, nil
}

func (c *dispatcher) AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) {
batchHeaderHash, err := batchHeader.GetBatchHeaderHash()
if err != nil {
return nil, err
}
responseChan := make(chan core.SigningMessage, len(state.IndexedOperators))

for id, op := range state.IndexedOperators {
go func(op core.IndexedOperatorInfo, id core.OperatorID) {
conn, err := grpc.Dial(
core.OperatorSocket(op.Socket).GetDispersalSocket(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
c.logger.Error("disperser cannot connect to operator dispersal socket", "socket", core.OperatorSocket(op.Socket).GetDispersalSocket(), "err", err)
return
}
defer conn.Close()

nodeClient := node.NewDispersalClient(conn)

requestedAt := time.Now()
sig, err := c.SendAttestBatchRequest(ctx, nodeClient, blobHeaderHashes, batchHeader, &op)
latencyMs := float64(time.Since(requestedAt).Milliseconds())
if err != nil {
responseChan <- core.SigningMessage{
Err: err,
Signature: nil,
Operator: id,
BatchHeaderHash: batchHeaderHash,
AttestationLatencyMs: latencyMs,
}
c.metrics.ObserveLatency(id.Hex(), false, latencyMs)
} else {
responseChan <- core.SigningMessage{
Signature: sig,
Operator: id,
BatchHeaderHash: batchHeaderHash,
AttestationLatencyMs: latencyMs,
Err: nil,
}
c.metrics.ObserveLatency(id.Hex(), true, latencyMs)
}
}(core.IndexedOperatorInfo{
PubkeyG1: op.PubkeyG1,
PubkeyG2: op.PubkeyG2,
Socket: op.Socket,
}, id)
}

return responseChan, nil
}

func (c *dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) {
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
defer cancel()
// start := time.Now()
hashes := make([][]byte, len(blobHeaderHashes))
for i, hash := range blobHeaderHashes {
hashes[i] = hash[:]
}

request := &node.AttestBatchRequest{
BatchHeader: getBatchHeaderMessage(batchHeader),
BlobHeaderHashes: hashes,
}

c.logger.Debug("sending AttestBatch request to operator", "operator", op.Socket, "numBlobs", len(blobHeaderHashes), "requestMessageSize", proto.Size(request), "referenceBlockNumber", batchHeader.ReferenceBlockNumber)
opt := grpc.MaxCallSendMsgSize(60 * 1024 * 1024 * 1024)
reply, err := nodeDispersalClient.AttestBatch(ctx, request, opt)
if err != nil {
return nil, fmt.Errorf("failed to send AttestBatch request to operator %s: %w", core.OperatorSocket(op.Socket).GetDispersalSocket(), err)
}

sigBytes := reply.GetSignature()
point, err := new(core.Signature).Deserialize(sigBytes)
if err != nil {
return nil, fmt.Errorf("failed to deserialize signature: %w", err)
}
return &core.Signature{G1Point: point}, nil
}

func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader) (*node.StoreChunksRequest, int64, error) {
blobs := make([]*node.Blob, len(blobMessages))
totalSize := int64(0)
Expand Down
59 changes: 59 additions & 0 deletions disperser/batcher/grpc/dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package dispatcher_test

import (
"context"
"math/big"
"testing"
"time"

grpcMock "github.com/Layr-Labs/eigenda/api/grpc/mock"
"github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/batcher"
dispatcher "github.com/Layr-Labs/eigenda/disperser/batcher/grpc"
"github.com/consensys/gnark-crypto/ecc/bn254"
"github.com/consensys/gnark-crypto/ecc/bn254/fp"
"github.com/stretchr/testify/assert"
)

func newDispatcher(t *testing.T, config *dispatcher.Config) disperser.Dispatcher {
loggerConfig := common.DefaultLoggerConfig()
logger, err := common.NewLogger(loggerConfig)
assert.NoError(t, err)
metrics := batcher.NewMetrics("9091", logger)
return dispatcher.NewDispatcher(config, logger, metrics.DispatcherMetrics)
}

func TestSendAttestBatchRequest(t *testing.T) {
dispatcher := newDispatcher(t, &dispatcher.Config{
Timeout: 5 * time.Second,
})
nodeClient := grpcMock.NewMockDispersalClient()
var X, Y fp.Element
X = *X.SetBigInt(big.NewInt(1))
Y = *Y.SetBigInt(big.NewInt(2))
signature := &core.Signature{
G1Point: &core.G1Point{
G1Affine: &bn254.G1Affine{
X: X,
Y: Y,
},
},
}
sigBytes := signature.Bytes()
nodeClient.On("AttestBatch").Return(&node.AttestBatchReply{
Signature: sigBytes[:],
}, nil)
sigReply, err := dispatcher.SendAttestBatchRequest(context.Background(), nodeClient, [][32]byte{{1}}, &core.BatchHeader{
ReferenceBlockNumber: 10,
BatchRoot: [32]byte{1},
}, &core.IndexedOperatorInfo{
PubkeyG1: nil,
PubkeyG2: nil,
Socket: "localhost:8080",
})
assert.NoError(t, err)
assert.Equal(t, signature, sigReply)
}
3 changes: 3 additions & 0 deletions disperser/disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/Layr-Labs/eigenda/encoding"

disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/api/grpc/node"
gcommon "github.com/ethereum/go-ethereum/common"
)

Expand Down Expand Up @@ -178,6 +179,8 @@ type BlobStore interface {
type Dispatcher interface {
DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage
SendBlobsToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error)
AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error)
SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error)
}

// GenerateReverseIndexKey returns the key used to store the blob key in the reverse index
Expand Down
17 changes: 17 additions & 0 deletions disperser/mock/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"

"github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/core"
coremock "github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/disperser"
Expand Down Expand Up @@ -72,3 +73,19 @@ func (d *Dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.Blob
}
return args.Get(0).([]*core.Signature), args.Error(1)
}

func (d *Dispatcher) AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) {
args := d.Called(ctx, state, blobHeaderHashes, batchHeader)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(chan core.SigningMessage), args.Error(1)
}

func (d *Dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) {
args := d.Called(ctx, nodeDispersalClient, blobHeaderHashes, batchHeader, op)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*core.Signature), args.Error(1)
}

0 comments on commit 58a2469

Please sign in to comment.