diff --git a/core/v2/core_test.go b/core/v2/core_test.go index 83c26a52b9..6184313771 100644 --- a/core/v2/core_test.go +++ b/core/v2/core_test.go @@ -205,14 +205,12 @@ func checkBatchByUniversalVerifier( cst core.IndexedChainState, packagedBlobs map[core.OperatorID][]*corev2.BlobShard, pool common.WorkerPool, - referenceBlockNumber uint64, ) error { ctx := context.Background() quorums := []core.QuorumID{0, 1} state, _ := cst.GetIndexedOperatorState(context.Background(), 0, quorums) - // numBlob := len(encodedBlobs) var errList *multierror.Error @@ -222,7 +220,7 @@ func checkBatchByUniversalVerifier( blobs := packagedBlobs[id] - err := val.ValidateBlobs(ctx, blobs, pool, referenceBlockNumber) + err := val.ValidateBlobs(ctx, blobs, pool, state.OperatorState) if err != nil { errList = multierror.Append(errList, err) } @@ -268,7 +266,7 @@ func TestValidationSucceeds(t *testing.T) { packagedBlobs, cst := prepareBlobs(t, operatorCount, headers, blobs, bn) t.Run(fmt.Sprintf("universal verifier operatorCount=%v over %v blobs", operatorCount, len(blobs)), func(t *testing.T) { - err := checkBatchByUniversalVerifier(cst, packagedBlobs, pool, bn) + err := checkBatchByUniversalVerifier(cst, packagedBlobs, pool) assert.NoError(t, err) }) diff --git a/core/v2/serialization.go b/core/v2/serialization.go index e9b94be813..1a67f2ab6d 100644 --- a/core/v2/serialization.go +++ b/core/v2/serialization.go @@ -1,6 +1,8 @@ package v2 import ( + "bytes" + "encoding/gob" "fmt" "math/big" @@ -218,6 +220,19 @@ func (c *BlobCertificate) Hash() ([32]byte, error) { return blobCertHash, nil } +func (c *BlobCertificate) Serialize() ([]byte, error) { + return encode(c) +} + +func DeserializeBlobCertificate(data []byte) (*BlobCertificate, error) { + var c BlobCertificate + err := decode(data, &c) + if err != nil { + return nil, err + } + return &c, nil +} + // GetBatchHeaderHash returns the hash of the batch header func (h BatchHeader) Hash() ([32]byte, error) { var headerHash [32]byte @@ -263,3 +278,36 @@ func (h BatchHeader) Hash() ([32]byte, error) { return headerHash, nil } + +func (h BatchHeader) Serialize() ([]byte, error) { + return encode(h) +} + +func DeserializeBatchHeader(data []byte) (*BatchHeader, error) { + var h BatchHeader + err := decode(data, &h) + if err != nil { + return nil, err + } + return &h, nil +} + +func encode(obj any) ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(obj) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func decode(data []byte, obj any) error { + buf := bytes.NewBuffer(data) + dec := gob.NewDecoder(buf) + err := dec.Decode(obj) + if err != nil { + return err + } + return nil +} diff --git a/core/v2/serialization_test.go b/core/v2/serialization_test.go index e9767de95e..36b57ee79e 100644 --- a/core/v2/serialization_test.go +++ b/core/v2/serialization_test.go @@ -70,6 +70,21 @@ func TestBatchHeaderHash(t *testing.T) { assert.Equal(t, "891d0936da4627f445ef193aad63afb173409af9e775e292e4e35aff790a45e2", hex.EncodeToString(hash[:])) } +func TestBatchHeaderSerialization(t *testing.T) { + batchRoot := [32]byte{} + copy(batchRoot[:], []byte("batchRoot")) + batchHeader := &v2.BatchHeader{ + ReferenceBlockNumber: 1000, + BatchRoot: batchRoot, + } + + serialized, err := batchHeader.Serialize() + assert.NoError(t, err) + deserialized, err := v2.DeserializeBatchHeader(serialized) + assert.NoError(t, err) + assert.Equal(t, batchHeader, deserialized) +} + func TestBlobCertHash(t *testing.T) { data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES) commitments, err := p.GetCommitments(data) @@ -97,3 +112,32 @@ func TestBlobCertHash(t *testing.T) { // 0xc4512b8702f69cb837fff50a93d3d28aada535b1f151b64db45859c3f5bb096a verified in solidity assert.Equal(t, "c4512b8702f69cb837fff50a93d3d28aada535b1f151b64db45859c3f5bb096a", hex.EncodeToString(hash[:])) } + +func TestBlobCertSerialization(t *testing.T) { + data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES) + commitments, err := p.GetCommitments(data) + if err != nil { + t.Fatal(err) + } + + blobCert := &v2.BlobCertificate{ + BlobHeader: &v2.BlobHeader{ + BlobVersion: 0, + BlobCommitments: commitments, + QuorumNumbers: []core.QuorumID{0, 1}, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 5, + CumulativePayment: big.NewInt(100), + }, + Signature: []byte{1, 2, 3}, + }, + RelayKeys: []v2.RelayKey{4, 5, 6}, + } + + serialized, err := blobCert.Serialize() + assert.NoError(t, err) + deserialized, err := v2.DeserializeBlobCertificate(serialized) + assert.NoError(t, err) + assert.Equal(t, blobCert, deserialized) +} diff --git a/core/v2/types.go b/core/v2/types.go index 2b6a0b963c..1b9ab8a915 100644 --- a/core/v2/types.go +++ b/core/v2/types.go @@ -202,7 +202,9 @@ func (c *BlobCertificate) ToProtobuf() (*commonpb.BlobCertificate, error) { } type BatchHeader struct { - BatchRoot [32]byte + // BatchRoot is the root of a Merkle tree whose leaves are the keys of the blobs in the batch + BatchRoot [32]byte + // ReferenceBlockNumber is the block number at which all operator information (stakes, indexes, etc.) is taken from ReferenceBlockNumber uint64 } diff --git a/core/v2/validator.go b/core/v2/validator.go index 5e570542cd..3942480d63 100644 --- a/core/v2/validator.go +++ b/core/v2/validator.go @@ -72,7 +72,7 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar return chunks, &assignment, nil } -func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, pool common.WorkerPool, referenceBlockNumber uint64) error { +func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, pool common.WorkerPool, state *core.OperatorState) error { var err error subBatchMap := make(map[encoding.EncodingParams]*encoding.SubBatch) blobCommitmentList := make([]encoding.BlobCommitments, len(blobs)) @@ -82,10 +82,10 @@ func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, return fmt.Errorf("number of bundles (%d) does not match number of quorums (%d)", len(blob.Bundles), len(blob.BlobHeader.QuorumNumbers)) } - state, err := v.chainState.GetOperatorState(ctx, uint(referenceBlockNumber), blob.BlobHeader.QuorumNumbers) - if err != nil { - return err - } + // state, err := v.chainState.GetOperatorState(ctx, uint(referenceBlockNumber), blob.BlobHeader.QuorumNumbers) + // if err != nil { + // return err + // } // Saved for the blob length validation blobCommitmentList[k] = blob.BlobHeader.BlobCommitments diff --git a/node/grpc/server_v2.go b/node/grpc/server_v2.go index 7dc7ac53ec..ae2902255b 100644 --- a/node/grpc/server_v2.go +++ b/node/grpc/server_v2.go @@ -58,7 +58,3 @@ func (s *ServerV2) NodeInfo(ctx context.Context, in *pb.NodeInfoRequest) (*pb.No func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*pb.StoreChunksReply, error) { return &pb.StoreChunksReply{}, api.NewErrorUnimplemented() } - -func (s *ServerV2) GetChunks(context.Context, *pb.GetChunksRequest) (*pb.GetChunksReply, error) { - return &pb.GetChunksReply{}, api.NewErrorUnimplemented() -} diff --git a/node/node_v2_test.go b/node/node_v2_test.go index 4e461fe3af..72a38f1ebe 100644 --- a/node/node_v2_test.go +++ b/node/node_v2_test.go @@ -16,9 +16,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestDownloadBatch(t *testing.T) { - c := newComponents(t) - ctx := context.Background() +func mockBatch(t *testing.T) ([]v2.BlobKey, *v2.Batch, []map[core.QuorumID]core.Bundle) { commitments := mockCommitment(t) bh0 := &v2.BlobHeader{ BlobVersion: 0, @@ -157,63 +155,70 @@ func TestDownloadBatch(t *testing.T) { }, } - batch := &v2.Batch{ + return []v2.BlobKey{blobKey0, blobKey1, blobKey2}, &v2.Batch{ BatchHeader: &v2.BatchHeader{ BatchRoot: [32]byte{1, 1, 1}, ReferenceBlockNumber: 100, }, BlobCertificates: []*v2.BlobCertificate{blobCert0, blobCert1, blobCert2}, - } + }, []map[core.QuorumID]core.Bundle{bundles0, bundles1, bundles2} +} + +func TestDownloadBatch(t *testing.T) { + c := newComponents(t) + ctx := context.Background() + blobKeys, batch, bundles := mockBatch(t) + blobCerts := batch.BlobCertificates - bundles00Bytes, err := bundles0[0].Serialize() + bundles00Bytes, err := bundles[0][0].Serialize() require.NoError(t, err) - bundles01Bytes, err := bundles0[1].Serialize() + bundles01Bytes, err := bundles[0][1].Serialize() require.NoError(t, err) - bundles10Bytes, err := bundles1[0].Serialize() + bundles10Bytes, err := bundles[1][0].Serialize() require.NoError(t, err) - bundles11Bytes, err := bundles1[1].Serialize() + bundles11Bytes, err := bundles[1][1].Serialize() require.NoError(t, err) - bundles21Bytes, err := bundles2[1].Serialize() + bundles21Bytes, err := bundles[2][1].Serialize() require.NoError(t, err) - bundles22Bytes, err := bundles2[2].Serialize() + bundles22Bytes, err := bundles[2][2].Serialize() require.NoError(t, err) c.relayClient.On("GetChunksByRange", mock.Anything, v2.RelayKey(0), mock.Anything).Return([][]byte{bundles00Bytes, bundles01Bytes, bundles21Bytes, bundles22Bytes}, nil).Run(func(args mock.Arguments) { requests := args.Get(2).([]*clients.ChunkRequestByRange) require.Len(t, requests, 4) - require.Equal(t, blobKey0, requests[0].BlobKey) - require.Equal(t, blobKey0, requests[1].BlobKey) - require.Equal(t, blobKey2, requests[2].BlobKey) - require.Equal(t, blobKey2, requests[3].BlobKey) + require.Equal(t, blobKeys[0], requests[0].BlobKey) + require.Equal(t, blobKeys[0], requests[1].BlobKey) + require.Equal(t, blobKeys[2], requests[2].BlobKey) + require.Equal(t, blobKeys[2], requests[3].BlobKey) }) c.relayClient.On("GetChunksByRange", mock.Anything, v2.RelayKey(1), mock.Anything).Return([][]byte{bundles10Bytes, bundles11Bytes}, nil).Run(func(args mock.Arguments) { requests := args.Get(2).([]*clients.ChunkRequestByRange) require.Len(t, requests, 2) - require.Equal(t, blobKey1, requests[0].BlobKey) - require.Equal(t, blobKey1, requests[1].BlobKey) + require.Equal(t, blobKeys[1], requests[0].BlobKey) + require.Equal(t, blobKeys[1], requests[1].BlobKey) }) blobShards, rawBundles, err := c.node.DownloadBatch(ctx, batch) require.NoError(t, err) require.Len(t, blobShards, 3) - require.Equal(t, blobCert0, blobShards[0].BlobCertificate) - require.Equal(t, blobCert1, blobShards[1].BlobCertificate) - require.Equal(t, blobCert2, blobShards[2].BlobCertificate) + require.Equal(t, blobCerts[0], blobShards[0].BlobCertificate) + require.Equal(t, blobCerts[1], blobShards[1].BlobCertificate) + require.Equal(t, blobCerts[2], blobShards[2].BlobCertificate) require.Contains(t, blobShards[0].Bundles, core.QuorumID(0)) require.Contains(t, blobShards[0].Bundles, core.QuorumID(1)) require.Contains(t, blobShards[1].Bundles, core.QuorumID(0)) require.Contains(t, blobShards[1].Bundles, core.QuorumID(1)) require.Contains(t, blobShards[2].Bundles, core.QuorumID(1)) require.Contains(t, blobShards[2].Bundles, core.QuorumID(2)) - bundleEqual(t, bundles0[0], blobShards[0].Bundles[0]) - bundleEqual(t, bundles0[1], blobShards[0].Bundles[1]) - bundleEqual(t, bundles1[0], blobShards[1].Bundles[0]) - bundleEqual(t, bundles1[1], blobShards[1].Bundles[1]) - bundleEqual(t, bundles2[1], blobShards[2].Bundles[1]) - bundleEqual(t, bundles2[2], blobShards[2].Bundles[2]) + bundleEqual(t, bundles[0][0], blobShards[0].Bundles[0]) + bundleEqual(t, bundles[0][1], blobShards[0].Bundles[1]) + bundleEqual(t, bundles[1][0], blobShards[1].Bundles[0]) + bundleEqual(t, bundles[1][1], blobShards[1].Bundles[1]) + bundleEqual(t, bundles[2][1], blobShards[2].Bundles[1]) + bundleEqual(t, bundles[2][2], blobShards[2].Bundles[2]) require.Len(t, rawBundles, 3) - require.Equal(t, blobCert0, rawBundles[0].BlobCertificate) - require.Equal(t, blobCert1, rawBundles[1].BlobCertificate) - require.Equal(t, blobCert2, rawBundles[2].BlobCertificate) + require.Equal(t, blobCerts[0], rawBundles[0].BlobCertificate) + require.Equal(t, blobCerts[1], rawBundles[1].BlobCertificate) + require.Equal(t, blobCerts[2], rawBundles[2].BlobCertificate) require.Contains(t, rawBundles[0].Bundles, core.QuorumID(0)) require.Contains(t, rawBundles[0].Bundles, core.QuorumID(1)) require.Contains(t, rawBundles[1].Bundles, core.QuorumID(0)) diff --git a/node/store_v2.go b/node/store_v2.go new file mode 100644 index 0000000000..c7b02ac6e8 --- /dev/null +++ b/node/store_v2.go @@ -0,0 +1,102 @@ +package node + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "time" + + "github.com/Layr-Labs/eigenda/common/kvstore" + "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigensdk-go/logging" +) + +const ( + BatchHeaderTableName = "batch_headers" + BlobCertificateTableName = "blob_certificates" + BundleTableName = "bundles" +) + +type StoreV2 struct { + db kvstore.TableStore + logger logging.Logger + + ttl time.Duration +} + +func NewLevelDBStoreV2(db kvstore.TableStore, logger logging.Logger) *StoreV2 { + return &StoreV2{ + db: db, + logger: logger, + } +} + +func (s *StoreV2) StoreBatch(ctx context.Context, batch *corev2.Batch, rawBundles []*RawBundles) error { + dbBatch := s.db.NewTTLBatch() + + batchHeaderKeyBuilder, err := s.db.GetKeyBuilder(BatchHeaderTableName) + if err != nil { + return fmt.Errorf("failed to get key builder for batch header: %v", err) + } + + batchHeaderHash, err := batch.BatchHeader.Hash() + if err != nil { + return fmt.Errorf("failed to hash batch header: %v", err) + } + + // Store batch header + batchHeaderBytes, err := batch.BatchHeader.Serialize() + if err != nil { + return fmt.Errorf("failed to serialize batch header: %v", err) + } + + batchHeaderKey := batchHeaderKeyBuilder.Key(batchHeaderHash[:]) + dbBatch.PutWithTTL(batchHeaderKey, batchHeaderBytes, s.ttl) + + // Store blob shards + for _, bundles := range rawBundles { + // Store blob certificate + blobCertificateKeyBuilder, err := s.db.GetKeyBuilder(BlobCertificateTableName) + if err != nil { + return fmt.Errorf("failed to get key builder for blob certificate: %v", err) + } + blobKey, err := bundles.BlobCertificate.BlobHeader.BlobKey() + if err != nil { + return fmt.Errorf("failed to get blob key: %v", err) + } + blobCertificateKey := blobCertificateKeyBuilder.Key(blobKey[:]) + blobCertificateBytes, err := bundles.BlobCertificate.Serialize() + if err != nil { + return fmt.Errorf("failed to serialize blob certificate: %v", err) + } + dbBatch.PutWithTTL(blobCertificateKey, blobCertificateBytes, s.ttl) + + // Store bundles + for quorum, bundle := range bundles.Bundles { + bundlesKeyBuilder, err := s.db.GetKeyBuilder(BundleTableName) + if err != nil { + return fmt.Errorf("failed to get key builder for bundles: %v", err) + } + + k, err := BundleKey(blobKey, quorum) + if err != nil { + return fmt.Errorf("failed to get key for bundles: %v", err) + } + + dbBatch.PutWithTTL(bundlesKeyBuilder.Key(k), bundle, s.ttl) + } + } + + return dbBatch.Apply() +} + +func BundleKey(blobKey corev2.BlobKey, quorumID core.QuorumID) ([]byte, error) { + buf := bytes.NewBuffer(blobKey[:]) + err := binary.Write(buf, binary.LittleEndian, quorumID) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} diff --git a/node/store_v2_test.go b/node/store_v2_test.go new file mode 100644 index 0000000000..b64b936ecb --- /dev/null +++ b/node/store_v2_test.go @@ -0,0 +1,103 @@ +package node_test + +import ( + "context" + "testing" + + "github.com/Layr-Labs/eigenda/common/kvstore" + "github.com/Layr-Labs/eigenda/common/kvstore/tablestore" + "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/node" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestStoreBatchV2(t *testing.T) { + ctx := context.Background() + _, batch, bundles := mockBatch(t) + + blobShards := make([]*corev2.BlobShard, len(batch.BlobCertificates)) + rawBundles := make([]*node.RawBundles, len(batch.BlobCertificates)) + for i, cert := range batch.BlobCertificates { + blobShards[i] = &corev2.BlobShard{ + BlobCertificate: cert, + Bundles: make(map[core.QuorumID]core.Bundle), + } + rawBundles[i] = &node.RawBundles{ + BlobCertificate: cert, + Bundles: make(map[core.QuorumID][]byte), + } + + for quorum, bundle := range bundles[i] { + blobShards[i].Bundles[quorum] = bundle + bundleBytes, err := bundle.Serialize() + assert.NoError(t, err) + rawBundles[i].Bundles[quorum] = bundleBytes + } + } + + s, db := createStoreV2(t) + defer func() { + _ = db.Shutdown() + }() + err := s.StoreBatch(ctx, batch, rawBundles) + require.NoError(t, err) + + tables := db.GetTables() + require.ElementsMatch(t, []string{node.BatchHeaderTableName, node.BlobCertificateTableName, node.BundleTableName}, tables) + + // Check batch header + bhh, err := batch.BatchHeader.Hash() + require.NoError(t, err) + batchHeaderKeyBuilder, err := db.GetKeyBuilder(node.BatchHeaderTableName) + require.NoError(t, err) + bhhBytes, err := db.Get(batchHeaderKeyBuilder.Key(bhh[:])) + require.NoError(t, err) + assert.NotNil(t, bhhBytes) + deserializedBatchHeader, err := corev2.DeserializeBatchHeader(bhhBytes) + require.NoError(t, err) + assert.Equal(t, batch.BatchHeader, deserializedBatchHeader) + + // Check blob certificates + blobCertKeyBuilder, err := db.GetKeyBuilder(node.BlobCertificateTableName) + require.NoError(t, err) + for _, cert := range batch.BlobCertificates { + blobKey, err := cert.BlobHeader.BlobKey() + require.NoError(t, err) + blobCertKey := blobCertKeyBuilder.Key(blobKey[:]) + blobCertBytes, err := db.Get(blobCertKey) + require.NoError(t, err) + assert.NotNil(t, blobCertBytes) + deserializedBlobCert, err := corev2.DeserializeBlobCertificate(blobCertBytes) + require.NoError(t, err) + assert.Equal(t, cert, deserializedBlobCert) + } + + // Check bundles + bundleKeyBuilder, err := db.GetKeyBuilder(node.BundleTableName) + require.NoError(t, err) + for _, bundles := range rawBundles { + blobKey, err := bundles.BlobCertificate.BlobHeader.BlobKey() + require.NoError(t, err) + for quorum, bundle := range bundles.Bundles { + k, err := node.BundleKey(blobKey, quorum) + require.NoError(t, err) + bundleBytes, err := db.Get(bundleKeyBuilder.Key(k)) + require.NoError(t, err) + assert.NotNil(t, bundleBytes) + require.Equal(t, bundle, bundleBytes) + } + } +} + +func createStoreV2(t *testing.T) (*node.StoreV2, kvstore.TableStore) { + logger := logging.NewNoopLogger() + config := tablestore.DefaultLevelDBConfig(t.TempDir()) + config.Schema = []string{node.BatchHeaderTableName, node.BlobCertificateTableName, node.BundleTableName} + tStore, err := tablestore.Start(logger, config) + require.NoError(t, err) + s := node.NewLevelDBStoreV2(tStore, logger) + return s, tStore +}