diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 5807a6f40d..9c14a433b8 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -147,6 +147,55 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, return sig, nil } +// SendBlobsToOperator sends blobs to an operator via the node's StoreBlobs endpoint +// It returns the signatures of the blobs sent to the operator in the same order as the blobs +// with nil values for blobs that were not attested by the operator +func (c *dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) { + // TODO Add secure Grpc + + conn, err := grpc.Dial( + core.OperatorSocket(op.Socket).GetDispersalSocket(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + c.logger.Warn("Disperser cannot connect to operator dispersal socket", "dispersal_socket", core.OperatorSocket(op.Socket).GetDispersalSocket(), "err", err) + return nil, err + } + defer conn.Close() + + gc := node.NewDispersalClient(conn) + ctx, cancel := context.WithTimeout(ctx, c.Timeout) + defer cancel() + start := time.Now() + request, totalSize, err := GetStoreBlobsRequest(blobs, batchHeader) + if err != nil { + return nil, err + } + c.logger.Debug("sending chunks to operator", "operator", op.Socket, "num blobs", len(blobs), "size", totalSize, "request message size", proto.Size(request), "request serialization time", time.Since(start)) + opt := grpc.MaxCallSendMsgSize(60 * 1024 * 1024 * 1024) + reply, err := gc.StoreBlobs(ctx, request, opt) + + if err != nil { + return nil, err + } + + signaturesInBytes := reply.GetSignatures() + signatures := make([]*core.Signature, len(signaturesInBytes)) + for _, sigBytes := range signaturesInBytes { + sig := sigBytes.GetValue() + if sig != nil { + point, err := new(core.Signature).Deserialize(sig) + if err != nil { + return nil, err + } + signatures = append(signatures, &core.Signature{G1Point: point}) + } else { + signatures = append(signatures, nil) + } + } + return signatures, nil +} + func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader) (*node.StoreChunksRequest, int64, error) { blobs := make([]*node.Blob, len(blobMessages)) totalSize := int64(0) @@ -167,6 +216,26 @@ func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.B return request, totalSize, nil } +func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader) (*node.StoreBlobsRequest, int64, error) { + blobs := make([]*node.Blob, len(blobMessages)) + totalSize := int64(0) + for i, blob := range blobMessages { + var err error + blobs[i], err = getBlobMessage(blob) + if err != nil { + return nil, 0, err + } + totalSize += int64(blob.Bundles.Size()) + } + + request := &node.StoreBlobsRequest{ + Blobs: blobs, + ReferenceBlockNumber: uint32(batchHeader.ReferenceBlockNumber), + } + + return request, totalSize, nil +} + func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) { if blob.BlobHeader == nil { return nil, errors.New("blob header is nil") diff --git a/disperser/disperser.go b/disperser/disperser.go index 78d078e594..f9ea58782c 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -177,6 +177,7 @@ type BlobStore interface { type Dispatcher interface { DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage + SendBlobsToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) } // GenerateReverseIndexKey returns the key used to store the blob key in the reverse index diff --git a/disperser/mock/dispatcher.go b/disperser/mock/dispatcher.go index 59d13f686e..8569e9779b 100644 --- a/disperser/mock/dispatcher.go +++ b/disperser/mock/dispatcher.go @@ -64,3 +64,11 @@ func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera return update } + +func (d *Dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) { + args := d.Called(ctx, blobs, batchHeader, op) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([]*core.Signature), args.Error(1) +}