Skip to content

Commit

Permalink
node v2 storechunks endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Nov 12, 2024
1 parent 36bd63b commit ab183e3
Show file tree
Hide file tree
Showing 10 changed files with 600 additions and 44 deletions.
2 changes: 1 addition & 1 deletion api/clients/mock/relay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (c *MockRelayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey,
}

func (c *MockRelayClient) GetChunksByRange(ctx context.Context, relayKey corev2.RelayKey, requests []*clients.ChunkRequestByRange) ([][]byte, error) {
args := c.Called()
args := c.Called(ctx, relayKey, requests)
return args.Get(0).([][]byte), args.Error(1)
}

Expand Down
6 changes: 3 additions & 3 deletions core/v2/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ func prepareBlobs(
}
if len(inverseMap[operatorID]) < blobIndex+1 {
inverseMap[operatorID] = append(inverseMap[operatorID], &corev2.BlobShard{
BlobCertificate: certs[blobIndex],
Chunks: make(map[core.QuorumID][]*encoding.Frame),
BlobCertificate: &certs[blobIndex],
Bundles: make(map[core.QuorumID]core.Bundle),
})
}
if len(frames) == 0 {
continue
}
inverseMap[operatorID][blobIndex].Chunks[quorum] = append(inverseMap[operatorID][blobIndex].Chunks[quorum], frames...)
inverseMap[operatorID][blobIndex].Bundles[quorum] = append(inverseMap[operatorID][blobIndex].Bundles[quorum], frames...)

}
}
Expand Down
71 changes: 71 additions & 0 deletions core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,77 @@ type Batch struct {
BlobCertificates []*BlobCertificate
}

func (b *Batch) ToProtobuf() (*commonpb.Batch, error) {
if b.BatchHeader == nil {
return nil, fmt.Errorf("batch header is nil")
}

if b.BatchHeader.BatchRoot == [32]byte{} {
return nil, fmt.Errorf("batch root is empty")
}

if b.BatchHeader.ReferenceBlockNumber == 0 {
return nil, fmt.Errorf("reference block number is 0")
}

blobCerts := make([]*commonpb.BlobCertificate, len(b.BlobCertificates))
for i, cert := range b.BlobCertificates {
blobCert, err := cert.ToProtobuf()
if err != nil {
return nil, fmt.Errorf("failed to convert blob certificate to protobuf: %v", err)
}
blobCerts[i] = blobCert
}

return &commonpb.Batch{
Header: &commonpb.BatchHeader{
BatchRoot: b.BatchHeader.BatchRoot[:],
ReferenceBlockNumber: b.BatchHeader.ReferenceBlockNumber,
},
BlobCertificates: blobCerts,
}, nil
}

func BatchFromProtobuf(proto *commonpb.Batch) (*Batch, error) {
if len(proto.GetBlobCertificates()) == 0 {
return nil, fmt.Errorf("missing blob certificates in batch")
}

if proto.GetHeader() == nil {
return nil, fmt.Errorf("missing header in batch")
}

if len(proto.GetHeader().GetBatchRoot()) != 32 {
return nil, fmt.Errorf("batch root must be 32 bytes")
}

batchHeader := &BatchHeader{
BatchRoot: [32]byte(proto.GetHeader().GetBatchRoot()),
ReferenceBlockNumber: proto.GetHeader().GetReferenceBlockNumber(),
}

blobCerts := make([]*BlobCertificate, len(proto.GetBlobCertificates()))
for i, cert := range proto.GetBlobCertificates() {
blobHeader, err := NewBlobHeader(cert.GetBlobHeader())
if err != nil {
return nil, fmt.Errorf("failed to create blob header: %v", err)
}

blobCerts[i] = &BlobCertificate{
BlobHeader: blobHeader,
RelayKeys: make([]RelayKey, len(cert.GetRelays())),
}
for j, r := range cert.GetRelays() {
blobCerts[i].RelayKeys[j] = RelayKey(r)
}
}

return &Batch{
BatchHeader: batchHeader,
BlobCertificates: blobCerts,
}, nil
}

type Attestation struct {
*BatchHeader

Expand Down
67 changes: 67 additions & 0 deletions core/v2/types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package v2_test

import (
"math/big"
"testing"

"github.com/Layr-Labs/eigenda/core"
v2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
"github.com/stretchr/testify/assert"
)

func TestConvertBatchToFromProtobuf(t *testing.T) {
data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES)
commitments, err := p.GetCommitments(data)
if err != nil {
t.Fatal(err)
}

bh0 := &v2.BlobHeader{
BlobVersion: 0,
BlobCommitments: commitments,
QuorumNumbers: []core.QuorumID{0, 1},
PaymentMetadata: core.PaymentMetadata{
AccountID: "0x123",
BinIndex: 5,
CumulativePayment: big.NewInt(100),
},
Signature: []byte{1, 2, 3},
}
bh1 := &v2.BlobHeader{
BlobVersion: 0,
BlobCommitments: commitments,
QuorumNumbers: []core.QuorumID{0, 1},
PaymentMetadata: core.PaymentMetadata{
AccountID: "0x456",
BinIndex: 6,
CumulativePayment: big.NewInt(200),
},
Signature: []byte{1, 2, 3},
}

blobCert0 := &v2.BlobCertificate{
BlobHeader: bh0,
RelayKeys: []v2.RelayKey{0, 1},
}
blobCert1 := &v2.BlobCertificate{
BlobHeader: bh1,
RelayKeys: []v2.RelayKey{2, 3},
}

batch := &v2.Batch{
BatchHeader: &v2.BatchHeader{
BatchRoot: [32]byte{1, 1, 1},
ReferenceBlockNumber: 100,
},
BlobCertificates: []*v2.BlobCertificate{blobCert0, blobCert1},
}

pb, err := batch.ToProtobuf()
assert.NoError(t, err)

newBatch, err := v2.BatchFromProtobuf(pb)
assert.NoError(t, err)

assert.Equal(t, batch, newBatch)
}
17 changes: 8 additions & 9 deletions core/v2/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ var (
)

type BlobShard struct {
BlobCertificate
Chunks map[core.QuorumID][]*encoding.Frame
*BlobCertificate
Bundles map[core.QuorumID]core.Bundle
}

// shardValidator implements the validation logic that a DA node should apply to its received data
Expand Down Expand Up @@ -52,18 +52,17 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar
if assignment.NumChunks == 0 {
return nil, nil, fmt.Errorf("%w: operator %s has no chunks in quorum %d", ErrBlobQuorumSkip, v.operatorID.Hex(), quorum)
}
if assignment.NumChunks != uint32(len(blob.Chunks[quorum])) {
return nil, nil, fmt.Errorf("number of chunks (%d) does not match assignment (%d) for quorum %d", len(blob.Chunks[quorum]), assignment.NumChunks, quorum)
if assignment.NumChunks != uint32(len(blob.Bundles[quorum])) {
return nil, nil, fmt.Errorf("number of chunks (%d) does not match assignment (%d) for quorum %d", len(blob.Bundles[quorum]), assignment.NumChunks, quorum)
}

// Validate the chunkLength against the confirmation and adversary threshold parameters
// Get the chunk length
chunkLength, err := GetChunkLength(blob.BlobHeader.BlobVersion, uint32(blob.BlobHeader.BlobCommitments.Length))
if err != nil {
return nil, nil, fmt.Errorf("invalid chunk length: %w", err)
}

// Get the chunk length
chunks := blob.Chunks[quorum]
chunks := blob.Bundles[quorum]
for _, chunk := range chunks {
if uint32(chunk.Length()) != chunkLength {
return nil, nil, fmt.Errorf("%w: chunk length (%d) does not match quorum header (%d) for quorum %d", ErrChunkLengthMismatch, chunk.Length(), chunkLength, quorum)
Expand All @@ -79,8 +78,8 @@ func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard,
blobCommitmentList := make([]encoding.BlobCommitments, len(blobs))

for k, blob := range blobs {
if len(blob.Chunks) != len(blob.BlobHeader.QuorumNumbers) {
return fmt.Errorf("number of bundles (%d) does not match number of quorums (%d)", len(blob.Chunks), len(blob.BlobHeader.QuorumNumbers))
if len(blob.Bundles) != len(blob.BlobHeader.QuorumNumbers) {
return fmt.Errorf("number of bundles (%d) does not match number of quorums (%d)", len(blob.Bundles), len(blob.BlobHeader.QuorumNumbers))
}

state, err := v.chainState.GetOperatorState(ctx, uint(referenceBlockNumber), blob.BlobHeader.QuorumNumbers)
Expand Down
31 changes: 17 additions & 14 deletions node/grpc/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package grpc
import (
"context"
"runtime"
"sync"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/node"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/shirou/gopsutil/mem"
Expand All @@ -18,23 +18,26 @@ type ServerV2 struct {
pb.UnimplementedDispersalServer
pb.UnimplementedRetrievalServer

node *node.Node
config *node.Config
logger logging.Logger

ratelimiter common.RateLimiter

mu *sync.Mutex
config *node.Config
node *node.Node
ratelimiter common.RateLimiter
assignmentCoordinator core.AssignmentCoordinator
logger logging.Logger
}

// NewServerV2 creates a new Server instance with the provided parameters.
func NewServerV2(config *node.Config, node *node.Node, logger logging.Logger, ratelimiter common.RateLimiter) *ServerV2 {
func NewServerV2(
config *node.Config,
node *node.Node,
logger logging.Logger,
ratelimiter common.RateLimiter,
) *ServerV2 {
return &ServerV2{
config: config,
logger: logger,
node: node,
ratelimiter: ratelimiter,
mu: &sync.Mutex{},
config: config,
node: node,
ratelimiter: ratelimiter,
assignmentCoordinator: &core.StdAssignmentCoordinator{},
logger: logger,
}
}

Expand Down
13 changes: 12 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/wealdtech/go-merkletree/v2/keccak256"
"google.golang.org/protobuf/proto"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/core"
Expand Down Expand Up @@ -66,12 +67,19 @@ type Node struct {
OperatorSocketsFilterer indexer.OperatorSocketsFilterer
ChainID *big.Int

RelayClient clients.RelayClient

mu sync.Mutex
CurrentSocket string
}

// NewNode creates a new Node with the provided config.
func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provider, logger logging.Logger) (*Node, error) {
func NewNode(
reg *prometheus.Registry,
config *Config,
pubIPProvider pubip.Provider,
logger logging.Logger,
) (*Node, error) {
// Setup metrics
// sdkClients, err := buildSdkClients(config, logger)
// if err != nil {
Expand Down Expand Up @@ -164,6 +172,8 @@ func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provi
"quorumIDs", fmt.Sprint(config.QuorumIDList), "registerNodeAtStart", config.RegisterNodeAtStart, "pubIPCheckInterval", config.PubIPCheckInterval,
"eigenDAServiceManagerAddr", config.EigenDAServiceManagerAddr, "blockStaleMeasure", blockStaleMeasure, "storeDurationBlocks", storeDurationBlocks, "enableGnarkBundleEncoding", config.EnableGnarkBundleEncoding)

var relayClient clients.RelayClient
// Create a new relay client with relay addresses onchain
return &Node{
Config: config,
Logger: nodeLogger,
Expand All @@ -177,6 +187,7 @@ func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provi
PubIPProvider: pubIPProvider,
OperatorSocketsFilterer: socketsFilterer,
ChainID: chainID,
RelayClient: relayClient,
}, nil
}

Expand Down
36 changes: 20 additions & 16 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package node_test

import (
"context"
"fmt"
"os"
"runtime"
"testing"
"time"

clientsmock "github.com/Layr-Labs/eigenda/api/clients/mock"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/core"
Expand All @@ -17,12 +17,15 @@ import (
"github.com/stretchr/testify/mock"
)

var privateKey = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"
var opID = [32]byte{}
var (
privateKey = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"
opID = [32]byte{0}
)

type components struct {
node *node.Node
tx *coremock.MockWriter
node *node.Node
tx *coremock.MockWriter
relayClient *clientsmock.MockRelayClient
}

func newComponents(t *testing.T) *components {
Expand All @@ -31,7 +34,6 @@ func newComponents(t *testing.T) *components {
if err != nil {
panic("failed to create a BLS Key")
}
copy(opID[:], []byte(fmt.Sprintf("%d", 3)))
config := &node.Config{
Timeout: 10 * time.Second,
ExpirationPollIntervalSec: 1,
Expand Down Expand Up @@ -69,19 +71,21 @@ func newComponents(t *testing.T) *components {
panic("failed to create a new levelDB store")
}
defer os.Remove(dbPath)

relayClient := clientsmock.NewRelayClient()
return &components{
node: &node.Node{
Config: config,
Logger: logger,
KeyPair: keyPair,
Metrics: nil,
Store: store,
ChainState: chainState,
Validator: mockVal,
Transactor: tx,
Config: config,
Logger: logger,
KeyPair: keyPair,
Metrics: nil,
Store: store,
ChainState: chainState,
Validator: mockVal,
Transactor: tx,
RelayClient: relayClient,
},
tx: tx,
tx: tx,
relayClient: relayClient,
}
}

Expand Down
Loading

0 comments on commit ab183e3

Please sign in to comment.