Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dispatcher] Add method to send AttestBatch request to nodes #664

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions api/grpc/mock/node_disperser_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package mock

import (
"context"

"github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc"
)

type MockNodeDispersalClient struct {
mock.Mock
}

var _ node.DispersalClient = (*MockNodeDispersalClient)(nil)

func NewMockDispersalClient() *MockNodeDispersalClient {
return &MockNodeDispersalClient{}
}

func (m *MockNodeDispersalClient) StoreChunks(ctx context.Context, in *node.StoreChunksRequest, opts ...grpc.CallOption) (*node.StoreChunksReply, error) {
args := m.Called()
return args.Get(0).(*node.StoreChunksReply), args.Error(1)
}

func (m *MockNodeDispersalClient) StoreBlobs(ctx context.Context, in *node.StoreBlobsRequest, opts ...grpc.CallOption) (*node.StoreBlobsReply, error) {
args := m.Called()
return args.Get(0).(*node.StoreBlobsReply), args.Error(1)
}

func (m *MockNodeDispersalClient) AttestBatch(ctx context.Context, in *node.AttestBatchRequest, opts ...grpc.CallOption) (*node.AttestBatchReply, error) {
args := m.Called()
return args.Get(0).(*node.AttestBatchReply), args.Error(1)
}

func (m *MockNodeDispersalClient) NodeInfo(ctx context.Context, in *node.NodeInfoRequest, opts ...grpc.CallOption) (*node.NodeInfoReply, error) {
args := m.Called()
return args.Get(0).(*node.NodeInfoReply), args.Error(1)
}
83 changes: 83 additions & 0 deletions disperser/batcher/grpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dispatcher
import (
"context"
"errors"
"fmt"
"time"

commonpb "github.com/Layr-Labs/eigenda/api/grpc/common"
Expand Down Expand Up @@ -196,6 +197,88 @@ func (c *dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.Blob
return signatures, nil
}

func (c *dispatcher) AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) {
batchHeaderHash, err := batchHeader.GetBatchHeaderHash()
if err != nil {
return nil, err
}
responseChan := make(chan core.SigningMessage, len(state.IndexedOperators))

for id, op := range state.IndexedOperators {
go func(op core.IndexedOperatorInfo, id core.OperatorID) {
conn, err := grpc.Dial(
core.OperatorSocket(op.Socket).GetDispersalSocket(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
c.logger.Error("disperser cannot connect to operator dispersal socket", "socket", core.OperatorSocket(op.Socket).GetDispersalSocket(), "err", err)
return
}
defer conn.Close()

nodeClient := node.NewDispersalClient(conn)

requestedAt := time.Now()
sig, err := c.SendAttestBatchRequest(ctx, nodeClient, blobHeaderHashes, batchHeader, &op)
latencyMs := float64(time.Since(requestedAt).Milliseconds())
if err != nil {
responseChan <- core.SigningMessage{
Err: err,
Signature: nil,
Operator: id,
BatchHeaderHash: batchHeaderHash,
AttestationLatencyMs: latencyMs,
}
c.metrics.ObserveLatency(id.Hex(), false, latencyMs)
} else {
responseChan <- core.SigningMessage{
Signature: sig,
Operator: id,
BatchHeaderHash: batchHeaderHash,
AttestationLatencyMs: latencyMs,
Err: nil,
}
c.metrics.ObserveLatency(id.Hex(), true, latencyMs)
}
}(core.IndexedOperatorInfo{
PubkeyG1: op.PubkeyG1,
PubkeyG2: op.PubkeyG2,
Socket: op.Socket,
}, id)
}

return responseChan, nil
}

func (c *dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) {
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
defer cancel()
// start := time.Now()
hashes := make([][]byte, len(blobHeaderHashes))
for i, hash := range blobHeaderHashes {
hashes[i] = hash[:]
}

request := &node.AttestBatchRequest{
BatchHeader: getBatchHeaderMessage(batchHeader),
BlobHeaderHashes: hashes,
}

c.logger.Debug("sending AttestBatch request to operator", "operator", op.Socket, "numBlobs", len(blobHeaderHashes), "requestMessageSize", proto.Size(request), "referenceBlockNumber", batchHeader.ReferenceBlockNumber)
opt := grpc.MaxCallSendMsgSize(60 * 1024 * 1024 * 1024)
reply, err := nodeDispersalClient.AttestBatch(ctx, request, opt)
if err != nil {
return nil, fmt.Errorf("failed to send AttestBatch request to operator %s: %w", core.OperatorSocket(op.Socket).GetDispersalSocket(), err)
}

sigBytes := reply.GetSignature()
point, err := new(core.Signature).Deserialize(sigBytes)
if err != nil {
return nil, fmt.Errorf("failed to deserialize signature: %w", err)
}
return &core.Signature{G1Point: point}, nil
}

func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader) (*node.StoreChunksRequest, int64, error) {
blobs := make([]*node.Blob, len(blobMessages))
totalSize := int64(0)
Expand Down
59 changes: 59 additions & 0 deletions disperser/batcher/grpc/dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package dispatcher_test

import (
"context"
"math/big"
"testing"
"time"

grpcMock "github.com/Layr-Labs/eigenda/api/grpc/mock"
"github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/batcher"
dispatcher "github.com/Layr-Labs/eigenda/disperser/batcher/grpc"
"github.com/consensys/gnark-crypto/ecc/bn254"
"github.com/consensys/gnark-crypto/ecc/bn254/fp"
"github.com/stretchr/testify/assert"
)

func newDispatcher(t *testing.T, config *dispatcher.Config) disperser.Dispatcher {
loggerConfig := common.DefaultLoggerConfig()
logger, err := common.NewLogger(loggerConfig)
assert.NoError(t, err)
metrics := batcher.NewMetrics("9091", logger)
return dispatcher.NewDispatcher(config, logger, metrics.DispatcherMetrics)
}

func TestSendAttestBatchRequest(t *testing.T) {
dispatcher := newDispatcher(t, &dispatcher.Config{
Timeout: 5 * time.Second,
})
nodeClient := grpcMock.NewMockDispersalClient()
var X, Y fp.Element
X = *X.SetBigInt(big.NewInt(1))
Y = *Y.SetBigInt(big.NewInt(2))
signature := &core.Signature{
G1Point: &core.G1Point{
G1Affine: &bn254.G1Affine{
X: X,
Y: Y,
},
},
}
sigBytes := signature.Bytes()
nodeClient.On("AttestBatch").Return(&node.AttestBatchReply{
Signature: sigBytes[:],
}, nil)
sigReply, err := dispatcher.SendAttestBatchRequest(context.Background(), nodeClient, [][32]byte{{1}}, &core.BatchHeader{
ReferenceBlockNumber: 10,
BatchRoot: [32]byte{1},
}, &core.IndexedOperatorInfo{
PubkeyG1: nil,
PubkeyG2: nil,
Socket: "localhost:8080",
})
assert.NoError(t, err)
assert.Equal(t, signature, sigReply)
}
3 changes: 3 additions & 0 deletions disperser/disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/Layr-Labs/eigenda/encoding"

disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/api/grpc/node"
gcommon "github.com/ethereum/go-ethereum/common"
)

Expand Down Expand Up @@ -178,6 +179,8 @@ type BlobStore interface {
type Dispatcher interface {
DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage
SendBlobsToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error)
AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error)
SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error)
}

// GenerateReverseIndexKey returns the key used to store the blob key in the reverse index
Expand Down
17 changes: 17 additions & 0 deletions disperser/mock/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"

"github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/core"
coremock "github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/disperser"
Expand Down Expand Up @@ -72,3 +73,19 @@ func (d *Dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.Blob
}
return args.Get(0).([]*core.Signature), args.Error(1)
}

func (d *Dispatcher) AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) {
args := d.Called(ctx, state, blobHeaderHashes, batchHeader)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(chan core.SigningMessage), args.Error(1)
}

func (d *Dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) {
args := d.Called(ctx, nodeDispersalClient, blobHeaderHashes, batchHeader, op)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*core.Signature), args.Error(1)
}
Loading