Skip to content

Commit

Permalink
Add BlobMinibatchMapping in minibatch store (#683)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Aug 13, 2024
1 parent 448800d commit 93d776f
Show file tree
Hide file tree
Showing 5 changed files with 375 additions and 10 deletions.
127 changes: 122 additions & 5 deletions disperser/batcher/batchstore/minibatch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/batcher"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -18,11 +19,13 @@ import (
)

const (
batchStatusIndexName = "BatchStatusIndex"
batchSKPrefix = "BATCH#"
minibatchSKPrefix = "MINIBATCH#"
dispersalRequestSKPrefix = "DISPERSAL_REQUEST#"
dispersalResponseSKPrefix = "DISPERSAL_RESPONSE#"
batchStatusIndexName = "BatchStatusIndex"
blobMinibatchMappingIndexName = "BlobMinibatchMappingIndex"
batchSKPrefix = "BATCH#"
minibatchSKPrefix = "MINIBATCH#"
dispersalRequestSKPrefix = "DISPERSAL_REQUEST#"
dispersalResponseSKPrefix = "DISPERSAL_RESPONSE#"
blobMinibatchMappingSKPrefix = "BLOB_MINIBATCH_MAPPING#"
)

type MinibatchStore struct {
Expand Down Expand Up @@ -63,6 +66,10 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit
AttributeName: aws.String("CreatedAt"),
AttributeType: types.ScalarAttributeTypeN,
},
{
AttributeName: aws.String("BlobHash"),
AttributeType: types.ScalarAttributeTypeS,
},
},
KeySchema: []types.KeySchemaElement{
{
Expand Down Expand Up @@ -96,6 +103,26 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit
WriteCapacityUnits: aws.Int64(writeCapacityUnits),
},
},
{
IndexName: aws.String(blobMinibatchMappingIndexName),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("BlobHash"),
KeyType: types.KeyTypeHash,
},
{
AttributeName: aws.String("SK"),
KeyType: types.KeyTypeRange,
},
},
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeAll,
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(readCapacityUnits),
WriteCapacityUnits: aws.Int64(writeCapacityUnits),
},
},
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(readCapacityUnits),
Expand Down Expand Up @@ -150,6 +177,18 @@ func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]t
return fields, nil
}

func MarshalBlobMinibatchMapping(blobMinibatchMapping *batcher.BlobMinibatchMapping) (map[string]types.AttributeValue, error) {
fields, err := attributevalue.MarshalMap(*blobMinibatchMapping)
if err != nil {
return nil, err
}
fields["BatchID"] = &types.AttributeValueMemberS{Value: blobMinibatchMapping.BatchID.String()}
fields["SK"] = &types.AttributeValueMemberS{Value: blobMinibatchMappingSKPrefix + fmt.Sprintf("%s#%s#%d", blobMinibatchMapping.BlobKey.MetadataHash, blobMinibatchMapping.BatchID, blobMinibatchMapping.BlobIndex)}
fields["BlobHash"] = &types.AttributeValueMemberS{Value: blobMinibatchMapping.BlobKey.BlobHash}
fields["MetadataHash"] = &types.AttributeValueMemberS{Value: blobMinibatchMapping.BlobKey.MetadataHash}
return fields, nil
}

func UnmarshalBatchID(item commondynamodb.Item) (*uuid.UUID, error) {
type BatchID struct {
BatchID string
Expand All @@ -169,6 +208,16 @@ func UnmarshalBatchID(item commondynamodb.Item) (*uuid.UUID, error) {
return &batchID, nil
}

func UnmarshalBlobKey(item commondynamodb.Item) (*disperser.BlobKey, error) {
blobKey := disperser.BlobKey{}
err := attributevalue.UnmarshalMap(item, &blobKey)
if err != nil {
return nil, err
}

return &blobKey, nil
}

func UnmarshalOperatorID(item commondynamodb.Item) (*core.OperatorID, error) {
type OperatorID struct {
OperatorID string
Expand Down Expand Up @@ -268,6 +317,28 @@ func UnmarshalDispersalResponse(item commondynamodb.Item) (*batcher.DispersalRes
return &response, nil
}

func UnmarshalBlobMinibatchMapping(item commondynamodb.Item) (*batcher.BlobMinibatchMapping, error) {
blobMinibatchMapping := batcher.BlobMinibatchMapping{}
err := attributevalue.UnmarshalMap(item, &blobMinibatchMapping)
if err != nil {
return nil, err
}

batchID, err := UnmarshalBatchID(item)
if err != nil {
return nil, err
}
blobMinibatchMapping.BatchID = *batchID

blobKey, err := UnmarshalBlobKey(item)
if err != nil {
return nil, err
}
blobMinibatchMapping.BlobKey = blobKey

return &blobMinibatchMapping, nil
}

func (m *MinibatchStore) PutBatch(ctx context.Context, batch *batcher.BatchRecord) error {
item, err := MarshalBatchRecord(batch)
if err != nil {
Expand Down Expand Up @@ -304,6 +375,27 @@ func (m *MinibatchStore) PutDispersalResponse(ctx context.Context, response *bat
return m.dynamoDBClient.PutItem(ctx, m.tableName, item)
}

func (m *MinibatchStore) PutBlobMinibatchMappings(ctx context.Context, blobMinibatchMappings []*batcher.BlobMinibatchMapping) error {
items := make([]map[string]types.AttributeValue, len(blobMinibatchMappings))
var err error
for i, blobMinibatchMapping := range blobMinibatchMappings {
items[i], err = MarshalBlobMinibatchMapping(blobMinibatchMapping)
if err != nil {
return err
}
}

failedItems, err := m.dynamoDBClient.PutItems(ctx, m.tableName, items)
if err != nil {
return err
}
if len(failedItems) > 0 {
return fmt.Errorf("failed to put blob minibatch mappings: %v", failedItems)
}

return nil
}

func (m *MinibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batcher.BatchRecord, error) {
item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{
"BatchID": &types.AttributeValueMemberS{
Expand Down Expand Up @@ -618,3 +710,28 @@ func (m *MinibatchStore) GetMinibatchDispersalResponses(ctx context.Context, bat

return responses, nil
}

func (m *MinibatchStore) GetBlobMinibatchMappings(ctx context.Context, blobKey disperser.BlobKey) ([]*batcher.BlobMinibatchMapping, error) {
items, err := m.dynamoDBClient.QueryIndex(ctx, m.tableName, blobMinibatchMappingIndexName, "BlobHash = :blobHash AND begins_with(SK, :prefix)", commondynamodb.ExpresseionValues{
":blobHash": &types.AttributeValueMemberS{
Value: blobKey.BlobHash,
},
":prefix": &types.AttributeValueMemberS{
Value: blobMinibatchMappingSKPrefix + blobKey.MetadataHash,
},
})
if err != nil {
return nil, err
}

blobMinibatchMappings := make([]*batcher.BlobMinibatchMapping, len(items))
for i, item := range items {
blobMinibatchMappings[i], err = UnmarshalBlobMinibatchMapping(item)
if err != nil {
m.logger.Errorf("failed to unmarshal blob minibatch mapping at index %d: %v", i, err)
return nil, err
}
}

return blobMinibatchMappings, nil
}
98 changes: 98 additions & 0 deletions disperser/batcher/batchstore/minibatch_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ import (
"github.com/Layr-Labs/eigenda/common/aws/dynamodb"
test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/batcher"
"github.com/Layr-Labs/eigenda/disperser/batcher/batchstore"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/inabox/deploy"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/consensys/gnark-crypto/ecc/bn254"
"github.com/consensys/gnark-crypto/ecc/bn254/fp"
gcommon "github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
"github.com/ory/dockertest/v3"
Expand Down Expand Up @@ -371,3 +375,97 @@ func TestDispersalStatus(t *testing.T) {
assert.NoError(t, err)
assert.True(t, dispersed)
}

func TestGetBlobMinibatchMappings(t *testing.T) {
ctx := context.Background()
batchID, err := uuid.NewV7()
assert.NoError(t, err)
blobKey := disperser.BlobKey{
BlobHash: "blob-hash",
MetadataHash: "metadata-hash",
}
var commitX, commitY, lengthX, lengthY fp.Element
_, err = commitX.SetString("21661178944771197726808973281966770251114553549453983978976194544185382599016")
assert.NoError(t, err)
_, err = commitY.SetString("9207254729396071334325696286939045899948985698134704137261649190717970615186")
assert.NoError(t, err)
commitment := &encoding.G1Commitment{
X: commitX,
Y: commitY,
}
_, err = lengthX.SetString("18730744272503541936633286178165146673834730535090946570310418711896464442549")
assert.NoError(t, err)
_, err = lengthY.SetString("15356431458378126778840641829778151778222945686256112821552210070627093656047")
assert.NoError(t, err)
var lengthXA0, lengthXA1, lengthYA0, lengthYA1 fp.Element
_, err = lengthXA0.SetString("10857046999023057135944570762232829481370756359578518086990519993285655852781")
assert.NoError(t, err)
_, err = lengthXA1.SetString("11559732032986387107991004021392285783925812861821192530917403151452391805634")
assert.NoError(t, err)
_, err = lengthYA0.SetString("8495653923123431417604973247489272438418190587263600148770280649306958101930")
assert.NoError(t, err)
_, err = lengthYA1.SetString("4082367875863433681332203403145435568316851327593401208105741076214120093531")
assert.NoError(t, err)

var lengthProof, lengthCommitment bn254.G2Affine
lengthProof.X.A0 = lengthXA0
lengthProof.X.A1 = lengthXA1
lengthProof.Y.A0 = lengthYA0
lengthProof.Y.A1 = lengthYA1

lengthCommitment = lengthProof
expectedDataLength := 111
expectedChunkLength := uint(222)
err = minibatchStore.PutBlobMinibatchMappings(ctx, []*batcher.BlobMinibatchMapping{
{
BlobKey: &blobKey,
BatchID: batchID,
MinibatchIndex: 11,
BlobIndex: 22,
BlobCommitments: encoding.BlobCommitments{
Commitment: commitment,
LengthCommitment: (*encoding.G2Commitment)(&lengthCommitment),
Length: uint(expectedDataLength),
LengthProof: (*encoding.LengthProof)(&lengthProof),
},
BlobQuorumInfos: []*core.BlobQuorumInfo{
{
ChunkLength: expectedChunkLength,
SecurityParam: core.SecurityParam{
QuorumID: 1,
ConfirmationThreshold: 55,
AdversaryThreshold: 33,
QuorumRate: 123,
},
},
},
},
})
assert.NoError(t, err)

mapping, err := minibatchStore.GetBlobMinibatchMappings(ctx, blobKey)
assert.NoError(t, err)
assert.Equal(t, 1, len(mapping))
assert.Equal(t, &blobKey, mapping[0].BlobKey)
assert.Equal(t, batchID, mapping[0].BatchID)
assert.Equal(t, uint(11), mapping[0].MinibatchIndex)
assert.Equal(t, uint(22), mapping[0].BlobIndex)
assert.Equal(t, commitment, mapping[0].BlobCommitments.Commitment)
lengthCommitmentBytes, err := mapping[0].BlobCommitments.LengthCommitment.Serialize()
assert.NoError(t, err)
expectedLengthCommitmentBytes := lengthCommitment.Bytes()
assert.Equal(t, expectedLengthCommitmentBytes[:], lengthCommitmentBytes[:])
assert.Equal(t, expectedDataLength, int(mapping[0].BlobCommitments.Length))
lengthProofBytes, err := mapping[0].BlobCommitments.LengthProof.Serialize()
assert.NoError(t, err)
expectedLengthProofBytes := lengthProof.Bytes()
assert.Equal(t, expectedLengthProofBytes[:], lengthProofBytes[:])
assert.Len(t, mapping[0].BlobQuorumInfos, 1)
assert.Equal(t, expectedChunkLength, mapping[0].BlobQuorumInfos[0].ChunkLength)
assert.Equal(t, core.SecurityParam{
QuorumID: 1,
ConfirmationThreshold: 55,
AdversaryThreshold: 33,
QuorumRate: 123,
}, mapping[0].BlobQuorumInfos[0].SecurityParam)
}
47 changes: 43 additions & 4 deletions disperser/batcher/inmem/minibatch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/batcher"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/google/uuid"
Expand All @@ -23,6 +24,8 @@ type minibatchStore struct {
DispersalRequests map[uuid.UUID]map[uint][]*batcher.DispersalRequest
// DispersalResponses maps batch IDs to a map from minibatch indices to dispersal responses
DispersalResponses map[uuid.UUID]map[uint][]*batcher.DispersalResponse
// BlobMinibatchMapping maps blob key to a map from batch ID to minibatch records
BlobMinibatchMapping map[string]map[uuid.UUID]*batcher.BlobMinibatchMapping

mu sync.RWMutex
logger logging.Logger
Expand All @@ -32,10 +35,11 @@ var _ batcher.MinibatchStore = (*minibatchStore)(nil)

func NewMinibatchStore(logger logging.Logger) batcher.MinibatchStore {
return &minibatchStore{
BatchRecords: make(map[uuid.UUID]*batcher.BatchRecord),
MinibatchRecords: make(map[uuid.UUID]map[uint]*batcher.MinibatchRecord),
DispersalRequests: make(map[uuid.UUID]map[uint][]*batcher.DispersalRequest),
DispersalResponses: make(map[uuid.UUID]map[uint][]*batcher.DispersalResponse),
BatchRecords: make(map[uuid.UUID]*batcher.BatchRecord),
MinibatchRecords: make(map[uuid.UUID]map[uint]*batcher.MinibatchRecord),
DispersalRequests: make(map[uuid.UUID]map[uint][]*batcher.DispersalRequest),
DispersalResponses: make(map[uuid.UUID]map[uint][]*batcher.DispersalResponse),
BlobMinibatchMapping: make(map[string]map[uuid.UUID]*batcher.BlobMinibatchMapping),

logger: logger,
}
Expand Down Expand Up @@ -234,6 +238,41 @@ func (m *minibatchStore) GetMinibatchDispersalResponses(ctx context.Context, bat
return m.DispersalResponses[batchID][minibatchIndex], nil
}

func (m *minibatchStore) GetBlobMinibatchMappings(ctx context.Context, blobKey disperser.BlobKey) ([]*batcher.BlobMinibatchMapping, error) {
m.mu.RLock()
defer m.mu.RUnlock()

if _, ok := m.BlobMinibatchMapping[blobKey.String()]; !ok {
return nil, nil
}

res := make([]*batcher.BlobMinibatchMapping, 0)
for _, blobMinibatchMapping := range m.BlobMinibatchMapping[blobKey.String()] {
res = append(res, blobMinibatchMapping)
}

return res, nil
}

func (m *minibatchStore) PutBlobMinibatchMappings(ctx context.Context, blobMinibatchMappings []*batcher.BlobMinibatchMapping) error {
m.mu.Lock()
defer m.mu.Unlock()

for _, blobMinibatchMapping := range blobMinibatchMappings {
if blobMinibatchMapping.BlobKey == nil {
return errors.New("blob key is nil")
}
blobKey := blobMinibatchMapping.BlobKey.String()

if _, ok := m.BlobMinibatchMapping[blobKey]; !ok {
m.BlobMinibatchMapping[blobKey] = make(map[uuid.UUID]*batcher.BlobMinibatchMapping)
}

m.BlobMinibatchMapping[blobKey][blobMinibatchMapping.BatchID] = blobMinibatchMapping
}
return nil
}

func (m *minibatchStore) GetLatestFormedBatch(ctx context.Context) (batch *batcher.BatchRecord, minibatches []*batcher.MinibatchRecord, err error) {
m.mu.RLock()
defer m.mu.RUnlock()
Expand Down
Loading

0 comments on commit 93d776f

Please sign in to comment.