From 931a4660297ad003082fd428fdfbca30135a750e Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Sat, 20 Jul 2024 16:41:43 -0700 Subject: [PATCH 1/8] Minibatch dynamodb store using single table schema design --- .../batcher/batchstore/minibatch_store.go | 293 ++++++++++++++++++ .../batchstore/minibatch_store_test.go | 168 ++++++++++ 2 files changed, 461 insertions(+) create mode 100644 disperser/batcher/batchstore/minibatch_store.go create mode 100644 disperser/batcher/batchstore/minibatch_store_test.go diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go new file mode 100644 index 0000000000..5edb9b9589 --- /dev/null +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -0,0 +1,293 @@ +package batchstore + +import ( + "context" + "fmt" + "time" + + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/disperser/batcher" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/google/uuid" +) + +const ( + batchKey = "BATCH#" + minibatchKey = "MINIBATCH#" + dispersalRequestKey = "DISPERSAL_REQUEST#" + dispersalResponseKey = "DISPERSAL_RESPONSE#" +) + +type MinibatchStore struct { + dynamoDBClient *commondynamodb.Client + tableName string + logger logging.Logger + ttl time.Duration +} + +func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *MinibatchStore { + logger.Debugf("creating minibatch store with table %s with TTL: %s", tableName, ttl) + return &MinibatchStore{ + dynamoDBClient: dynamoDBClient, + logger: logger.With("component", "MinibatchStore"), + tableName: tableName, + ttl: ttl, + } +} + +func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { + return &dynamodb.CreateTableInput{ + AttributeDefinitions: []types.AttributeDefinition{ + { + AttributeName: aws.String("PK"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("SK"), + AttributeType: types.ScalarAttributeTypeS, + }, + }, + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("PK"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("SK"), + KeyType: types.KeyTypeRange, + }, + }, + TableName: aws.String(tableName), + GlobalSecondaryIndexes: nil, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + } +} + +func MarshalBatchRecord(batch *batcher.BatchRecord) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(batch) + if err != nil { + return nil, err + } + fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + batch.ID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: batchKey + batch.ID.String()} + fields["CreatedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", batch.CreatedAt.UTC().Unix())} + return fields, nil +} + +func MarshalMinibatchRecord(minibatch *batcher.MinibatchRecord) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(minibatch) + if err != nil { + return nil, err + } + fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + minibatch.BatchID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: minibatchKey + fmt.Sprintf("%d", minibatch.MinibatchIndex)} + return fields, nil +} + +func MarshalDispersalRequest(request *batcher.DispersalRequest) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(request) + if err != nil { + return nil, err + } + fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + request.BatchID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestKey + fmt.Sprintf("%d", request.MinibatchIndex)} + fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", request.RequestedAt.UTC().Unix())} + return fields, nil +} + +func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(response) + if err != nil { + return nil, err + } + fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + response.BatchID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseKey + fmt.Sprintf("%d", response.MinibatchIndex)} + fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RequestedAt.UTC().Unix())} + return fields, nil +} +func UnmarshalBatchRecord(item commondynamodb.Item) (*batcher.BatchRecord, error) { + batch := batcher.BatchRecord{} + err := attributevalue.UnmarshalMap(item, &batch) + if err != nil { + return nil, err + } + batch.CreatedAt = batch.CreatedAt.UTC() + return &batch, nil +} + +func UnmarshalMinibatchRecord(item commondynamodb.Item) (*batcher.MinibatchRecord, error) { + minibatch := batcher.MinibatchRecord{} + err := attributevalue.UnmarshalMap(item, &minibatch) + if err != nil { + return nil, err + } + return &minibatch, nil +} + +func UnmarshalDispersalRequest(item commondynamodb.Item) (*batcher.DispersalRequest, error) { + request := batcher.DispersalRequest{} + err := attributevalue.UnmarshalMap(item, &request) + if err != nil { + return nil, err + } + request.RequestedAt = request.RequestedAt.UTC() + return &request, nil +} + +func UnmarshalDispersalResponse(item commondynamodb.Item) (*batcher.DispersalResponse, error) { + response := batcher.DispersalResponse{} + err := attributevalue.UnmarshalMap(item, &response) + if err != nil { + return nil, err + } + response.RespondedAt = response.RespondedAt.UTC() + return &response, nil +} + +func (m *MinibatchStore) PutBatch(ctx context.Context, batch *batcher.BatchRecord) error { + item, err := MarshalBatchRecord(batch) + if err != nil { + return err + } + + return m.dynamoDBClient.PutItem(ctx, m.tableName, item) +} + +func (m *MinibatchStore) PutMinibatch(ctx context.Context, minibatch *batcher.MinibatchRecord) error { + item, err := MarshalMinibatchRecord(minibatch) + if err != nil { + return err + } + + return m.dynamoDBClient.PutItem(ctx, m.tableName, item) +} + +func (m *MinibatchStore) PutDispersalRequest(ctx context.Context, request *batcher.DispersalRequest) error { + item, err := MarshalDispersalRequest(request) + if err != nil { + return err + } + + fmt.Printf("%v", item) + return m.dynamoDBClient.PutItem(ctx, m.tableName, item) +} + +func (m *MinibatchStore) PutDispersalResponse(ctx context.Context, response *batcher.DispersalResponse) error { + item, err := MarshalDispersalResponse(response) + if err != nil { + return err + } + + return m.dynamoDBClient.PutItem(ctx, m.tableName, item) +} + +func (m *MinibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batcher.BatchRecord, error) { + item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: batchKey + batchID.String(), + }, + "SK": &types.AttributeValueMemberS{ + Value: batchKey + batchID.String(), + }, + }) + if err != nil { + m.logger.Errorf("failed to get batch from DynamoDB: %v", err) + return nil, err + } + + if item == nil { + return nil, nil + } + + batch, err := UnmarshalBatchRecord(item) + if err != nil { + m.logger.Errorf("failed to unmarshal batch record from DynamoDB: %v", err) + return nil, err + } + return batch, nil +} + +func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) { + item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: batchKey + batchID.String(), + }, + "SK": &types.AttributeValueMemberS{ + Value: minibatchKey + fmt.Sprintf("%d", minibatchIndex), + }, + }) + if err != nil { + m.logger.Errorf("failed to get minibatch from DynamoDB: %v", err) + return nil, err + } + + if item == nil { + return nil, nil + } + + minibatch, err := UnmarshalMinibatchRecord(item) + if err != nil { + m.logger.Errorf("failed to unmarshal minibatch record from DynamoDB: %v", err) + return nil, err + } + return minibatch, nil +} + +func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalRequest, error) { + item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: batchKey + batchID.String(), + }, + "SK": &types.AttributeValueMemberS{ + Value: dispersalRequestKey + fmt.Sprintf("%d", minibatchIndex), + }, + }) + if err != nil { + m.logger.Errorf("failed to get dispersal request from DynamoDB: %v", err) + return nil, err + } + + if item == nil { + return nil, nil + } + + request, err := UnmarshalDispersalRequest(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal request from DynamoDB: %v", err) + return nil, err + } + return request, nil +} + +func (m *MinibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalResponse, error) { + item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: batchKey + batchID.String(), + }, + "SK": &types.AttributeValueMemberS{ + Value: dispersalResponseKey + fmt.Sprintf("%d", minibatchIndex), + }, + }) + if err != nil { + m.logger.Errorf("failed to get dispersal response from DynamoDB: %v", err) + return nil, err + } + + if item == nil { + return nil, nil + } + + response, err := UnmarshalDispersalResponse(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal response from DynamoDB: %v", err) + return nil, err + } + return response, nil +} diff --git a/disperser/batcher/batchstore/minibatch_store_test.go b/disperser/batcher/batchstore/minibatch_store_test.go new file mode 100644 index 0000000000..0e3cdce9be --- /dev/null +++ b/disperser/batcher/batchstore/minibatch_store_test.go @@ -0,0 +1,168 @@ +package batchstore_test + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/Layr-Labs/eigenda/common/aws" + "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser/batcher" + "github.com/Layr-Labs/eigenda/disperser/batcher/batchstore" + "github.com/Layr-Labs/eigenda/inabox/deploy" + "github.com/Layr-Labs/eigensdk-go/logging" + gcommon "github.com/ethereum/go-ethereum/common" + "github.com/google/uuid" + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/assert" +) + +var ( + logger = logging.NewNoopLogger() + + dockertestPool *dockertest.Pool + dockertestResource *dockertest.Resource + + deployLocalStack bool + localStackPort = "4566" + + dynamoClient *dynamodb.Client + minibatchStore *batchstore.MinibatchStore + + UUID = uuid.New() + minibatchTableName = fmt.Sprintf("test-MinibatchStore-%v", UUID) +) + +func setup(m *testing.M) { + if deployLocalStack { + var err error + dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localStackPort) + if err != nil { + teardown() + panic("failed to start localstack container") + } + + } + + cfg := aws.ClientConfig{ + Region: "us-east-1", + AccessKey: "localstack", + SecretAccessKey: "localstack", + EndpointURL: fmt.Sprintf("http://0.0.0.0:%s", localStackPort), + } + + _, err := test_utils.CreateTable(context.Background(), cfg, minibatchTableName, batchstore.GenerateTableSchema(minibatchTableName, 10, 10)) + if err != nil { + teardown() + panic("failed to create dynamodb table: " + err.Error()) + } + + dynamoClient, err = dynamodb.NewClient(cfg, logger) + if err != nil { + teardown() + panic("failed to create dynamodb client: " + err.Error()) + } + + fmt.Printf("m: %v\n", m) + minibatchStore = batchstore.NewMinibatchStore(dynamoClient, logger, minibatchTableName, time.Hour) +} + +func teardown() { + if deployLocalStack { + deploy.PurgeDockertestResources(dockertestPool, dockertestResource) + } +} + +func TestMain(m *testing.M) { + setup(m) + code := m.Run() + teardown() + os.Exit(code) +} + +func TestPutBatch(t *testing.T) { + ctx := context.Background() + id, err := uuid.NewV7() + assert.NoError(t, err) + ts := time.Now().Truncate(time.Second).UTC() + batch := &batcher.BatchRecord{ + ID: id, + CreatedAt: ts, + ReferenceBlockNumber: 1, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + err = minibatchStore.PutBatch(ctx, batch) + assert.NoError(t, err) + b, err := minibatchStore.GetBatch(ctx, batch.ID) + assert.NoError(t, err) + assert.Equal(t, batch, b) +} + +func TestPutMinibatch(t *testing.T) { + ctx := context.Background() + id, err := uuid.NewV7() + assert.NoError(t, err) + minibatch := &batcher.MinibatchRecord{ + BatchID: id, + MinibatchIndex: 12, + BlobHeaderHashes: [][32]byte{{1}}, + BatchSize: 1, + ReferenceBlockNumber: 1, + } + err = minibatchStore.PutMinibatch(ctx, minibatch) + assert.NoError(t, err) + m, err := minibatchStore.GetMinibatch(ctx, minibatch.BatchID, minibatch.MinibatchIndex) + assert.NoError(t, err) + assert.Equal(t, minibatch, m) +} + +func TestPutDispersalRequest(t *testing.T) { + ctx := context.Background() + id, err := uuid.NewV7() + assert.NoError(t, err) + ts := time.Now().Truncate(time.Second).UTC() + request := &batcher.DispersalRequest{ + BatchID: id, + MinibatchIndex: 0, + OperatorID: core.OperatorID([32]byte{1}), + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: ts, + } + err = minibatchStore.PutDispersalRequest(ctx, request) + assert.NoError(t, err) + r, err := minibatchStore.GetDispersalRequest(ctx, request.BatchID, request.MinibatchIndex) + assert.NoError(t, err) + assert.Equal(t, request, r) +} + +func TestPutDispersalResponse(t *testing.T) { + ctx := context.Background() + id, err := uuid.NewV7() + assert.NoError(t, err) + ts := time.Now().Truncate(time.Second).UTC() + response := &batcher.DispersalResponse{ + DispersalRequest: batcher.DispersalRequest{ + BatchID: id, + MinibatchIndex: 0, + OperatorID: core.OperatorID([32]byte{1}), + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: ts, + }, + Signatures: nil, + RespondedAt: ts, + Error: nil, + } + err = minibatchStore.PutDispersalResponse(ctx, response) + assert.NoError(t, err) + r, err := minibatchStore.GetDispersalResponse(ctx, response.BatchID, response.MinibatchIndex) + assert.NoError(t, err) + assert.Equal(t, response, r) +} From 342073ebe3308dd032ce5b5e15fdd32e14974288 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Mon, 22 Jul 2024 16:58:36 -0700 Subject: [PATCH 2/8] Create example GSI on OperatorID RequestedAt Replace DispersalRequest.OperatorID field with raw byte array def so that dynamo can use unmarshalMap --- .../batcher/batchstore/minibatch_store.go | 40 +++++++++++++++++-- .../batchstore/minibatch_store_test.go | 5 ++- disperser/batcher/minibatch_store.go | 6 +-- 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go index 5edb9b9589..82e3bd8617 100644 --- a/disperser/batcher/batchstore/minibatch_store.go +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -50,6 +50,14 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit AttributeName: aws.String("SK"), AttributeType: types.ScalarAttributeTypeS, }, + { + AttributeName: aws.String("OperatorID"), + AttributeType: types.ScalarAttributeTypeB, + }, + { + AttributeName: aws.String("RequestedAt"), + AttributeType: types.ScalarAttributeTypeN, + }, }, KeySchema: []types.KeySchemaElement{ { @@ -61,8 +69,29 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit KeyType: types.KeyTypeRange, }, }, - TableName: aws.String(tableName), - GlobalSecondaryIndexes: nil, + TableName: aws.String(tableName), + GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ + { + IndexName: aws.String("OperatorID_RequestedAt_Index"), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("OperatorID"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("RequestedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + }, ProvisionedThroughput: &types.ProvisionedThroughput{ ReadCapacityUnits: aws.Int64(readCapacityUnits), WriteCapacityUnits: aws.Int64(writeCapacityUnits), @@ -109,7 +138,8 @@ func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]t } fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + response.BatchID.String()} fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseKey + fmt.Sprintf("%d", response.MinibatchIndex)} - fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RequestedAt.UTC().Unix())} + fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())} + fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.DispersalRequest.RequestedAt.UTC().Unix())} return fields, nil } func UnmarshalBatchRecord(item commondynamodb.Item) (*batcher.BatchRecord, error) { @@ -135,8 +165,9 @@ func UnmarshalDispersalRequest(item commondynamodb.Item) (*batcher.DispersalRequ request := batcher.DispersalRequest{} err := attributevalue.UnmarshalMap(item, &request) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to unmarshal dispersal request from DynamoDB: %v", err) } + request.RequestedAt = request.RequestedAt.UTC() return &request, nil } @@ -148,6 +179,7 @@ func UnmarshalDispersalResponse(item commondynamodb.Item) (*batcher.DispersalRes return nil, err } response.RespondedAt = response.RespondedAt.UTC() + response.DispersalRequest.RequestedAt = response.DispersalRequest.RequestedAt.UTC() return &response, nil } diff --git a/disperser/batcher/batchstore/minibatch_store_test.go b/disperser/batcher/batchstore/minibatch_store_test.go index 0e3cdce9be..75f02dd681 100644 --- a/disperser/batcher/batchstore/minibatch_store_test.go +++ b/disperser/batcher/batchstore/minibatch_store_test.go @@ -28,7 +28,7 @@ var ( dockertestResource *dockertest.Resource deployLocalStack bool - localStackPort = "4566" + localStackPort = "4570" dynamoClient *dynamodb.Client minibatchStore *batchstore.MinibatchStore @@ -130,7 +130,7 @@ func TestPutDispersalRequest(t *testing.T) { request := &batcher.DispersalRequest{ BatchID: id, MinibatchIndex: 0, - OperatorID: core.OperatorID([32]byte{1}), + OperatorID: core.OperatorID([32]byte{123}), OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, RequestedAt: ts, @@ -165,4 +165,5 @@ func TestPutDispersalResponse(t *testing.T) { r, err := minibatchStore.GetDispersalResponse(ctx, response.BatchID, response.MinibatchIndex) assert.NoError(t, err) assert.Equal(t, response, r) + assert.Error(t, err) } diff --git a/disperser/batcher/minibatch_store.go b/disperser/batcher/minibatch_store.go index d1e3ed24c1..330fdf4121 100644 --- a/disperser/batcher/minibatch_store.go +++ b/disperser/batcher/minibatch_store.go @@ -47,9 +47,9 @@ type MinibatchRecord struct { } type DispersalRequest struct { - BatchID uuid.UUID - MinibatchIndex uint - core.OperatorID + BatchID uuid.UUID + MinibatchIndex uint + OperatorID [32]byte OperatorAddress gcommon.Address Socket string NumBlobs uint From ef2e3b99c6fa68b3d5d7650339b55ca5ba1e8b29 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Tue, 23 Jul 2024 16:55:44 -0700 Subject: [PATCH 3/8] Refactor dynamodb client Convert PK to BatchID Adds custom minibatch struct conversion for batch, minibatch, dispersalRequest, dispersalResponse --- .../batcher/batchstore/minibatch_store.go | 236 +++++++++++++++--- .../batchstore/minibatch_store_test.go | 3 +- disperser/batcher/minibatch_store.go | 6 +- 3 files changed, 201 insertions(+), 44 deletions(-) diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go index 82e3bd8617..d4ba8d8a63 100644 --- a/disperser/batcher/batchstore/minibatch_store.go +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -6,22 +6,56 @@ import ( "time" commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/disperser/batcher" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + gcommon "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" ) const ( - batchKey = "BATCH#" - minibatchKey = "MINIBATCH#" - dispersalRequestKey = "DISPERSAL_REQUEST#" - dispersalResponseKey = "DISPERSAL_RESPONSE#" + batchSK = "BATCH#" + minibatchSK = "MINIBATCH#" + dispersalRequestSK = "DISPERSAL_REQUEST#" + dispersalResponseSK = "DISPERSAL_RESPONSE#" ) +type DynamoBatchRecord struct { + BatchID string + CreatedAt time.Time + ReferenceBlockNumber uint + HeaderHash [32]byte + AggregatePubKey *core.G2Point + AggregateSignature *core.Signature +} + +type DynamoMinibatchRecord struct { + BatchID string + MinibatchIndex uint + BlobHeaderHashes [][32]byte + BatchSize uint64 + ReferenceBlockNumber uint +} + +type DynamoDispersalRequest struct { + BatchID string + MinibatchIndex uint + OperatorID string + OperatorAddress string + NumBlobs uint + RequestedAt time.Time +} + +type DynamoDispersalResponse struct { + DynamoDispersalRequest + Signatures []*core.Signature + RespondedAt time.Time + Error error +} type MinibatchStore struct { dynamoDBClient *commondynamodb.Client tableName string @@ -43,7 +77,7 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit return &dynamodb.CreateTableInput{ AttributeDefinitions: []types.AttributeDefinition{ { - AttributeName: aws.String("PK"), + AttributeName: aws.String("BatchID"), AttributeType: types.ScalarAttributeTypeS, }, { @@ -52,7 +86,7 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit }, { AttributeName: aws.String("OperatorID"), - AttributeType: types.ScalarAttributeTypeB, + AttributeType: types.ScalarAttributeTypeS, }, { AttributeName: aws.String("RequestedAt"), @@ -61,7 +95,7 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit }, KeySchema: []types.KeySchemaElement{ { - AttributeName: aws.String("PK"), + AttributeName: aws.String("BatchID"), KeyType: types.KeyTypeHash, }, { @@ -99,62 +133,176 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit } } +func ToDynamoBatchRecord(br batcher.BatchRecord) DynamoBatchRecord { + return DynamoBatchRecord{ + BatchID: br.ID.String(), + CreatedAt: br.CreatedAt, + ReferenceBlockNumber: br.ReferenceBlockNumber, + HeaderHash: br.HeaderHash, + AggregatePubKey: br.AggregatePubKey, + AggregateSignature: br.AggregateSignature, + } +} + +func ToDynamoMinibatchRecord(br batcher.MinibatchRecord) DynamoMinibatchRecord { + return DynamoMinibatchRecord{ + BatchID: br.BatchID.String(), + MinibatchIndex: br.MinibatchIndex, + BlobHeaderHashes: br.BlobHeaderHashes, + BatchSize: br.BatchSize, + ReferenceBlockNumber: br.ReferenceBlockNumber, + } +} + +func ToDynamoDispersalRequest(dr batcher.DispersalRequest) DynamoDispersalRequest { + return DynamoDispersalRequest{ + BatchID: dr.BatchID.String(), + MinibatchIndex: dr.MinibatchIndex, + OperatorID: dr.OperatorID.Hex(), + OperatorAddress: dr.OperatorAddress.Hex(), + NumBlobs: dr.NumBlobs, + RequestedAt: dr.RequestedAt, + } +} + +func ToDynamoDispersalResponse(dr batcher.DispersalResponse) DynamoDispersalResponse { + return DynamoDispersalResponse{ + DynamoDispersalRequest: ToDynamoDispersalRequest(dr.DispersalRequest), + Signatures: dr.Signatures, + RespondedAt: dr.RespondedAt, + Error: dr.Error, + } +} + +func FromDynamoBatchRecord(dbr DynamoBatchRecord) (batcher.BatchRecord, error) { + batchID, err := uuid.Parse(dbr.BatchID) + if err != nil { + return batcher.BatchRecord{}, fmt.Errorf("failed to convert dynamo batch record batch ID %v from string: %v", dbr.BatchID, err) + } + + return batcher.BatchRecord{ + ID: batchID, + CreatedAt: dbr.CreatedAt, + ReferenceBlockNumber: dbr.ReferenceBlockNumber, + HeaderHash: dbr.HeaderHash, + AggregatePubKey: dbr.AggregatePubKey, + AggregateSignature: dbr.AggregateSignature, + }, nil +} + +func FromDynamoMinibatchRecord(dbr DynamoMinibatchRecord) (batcher.MinibatchRecord, error) { + batchID, err := uuid.Parse(dbr.BatchID) + if err != nil { + return batcher.MinibatchRecord{}, fmt.Errorf("failed to convert dynamo minibatch record batch ID %v from string: %v", dbr.BatchID, err) + } + + return batcher.MinibatchRecord{ + BatchID: batchID, + MinibatchIndex: dbr.MinibatchIndex, + BlobHeaderHashes: dbr.BlobHeaderHashes, + BatchSize: dbr.BatchSize, + ReferenceBlockNumber: dbr.ReferenceBlockNumber, + }, nil +} + +func FromDynamoDispersalRequest(ddr DynamoDispersalRequest) (batcher.DispersalRequest, error) { + batchID, err := uuid.Parse(ddr.BatchID) + if err != nil { + return batcher.DispersalRequest{}, fmt.Errorf("failed to convert dynamo dispersal request batch ID %v from string: %v", ddr.BatchID, err) + } + operatorID, err := core.OperatorIDFromHex(ddr.OperatorID) + if err != nil { + return batcher.DispersalRequest{}, fmt.Errorf("failed to convert dynamo dispersal request operator ID %v from hex: %v", ddr.OperatorID, err) + } + + return batcher.DispersalRequest{ + BatchID: batchID, + MinibatchIndex: ddr.MinibatchIndex, + OperatorID: operatorID, + OperatorAddress: gcommon.HexToAddress(ddr.OperatorAddress), + NumBlobs: ddr.NumBlobs, + RequestedAt: ddr.RequestedAt, + }, nil +} + +func FromDynamoDispersalResponse(ddr DynamoDispersalResponse) (batcher.DispersalResponse, error) { + request, err := FromDynamoDispersalRequest(ddr.DynamoDispersalRequest) + if err != nil { + return batcher.DispersalResponse{}, err + } + + return batcher.DispersalResponse{ + DispersalRequest: request, + Signatures: ddr.Signatures, + RespondedAt: ddr.RespondedAt, + Error: ddr.Error, + }, nil +} + func MarshalBatchRecord(batch *batcher.BatchRecord) (map[string]types.AttributeValue, error) { - fields, err := attributevalue.MarshalMap(batch) + fields, err := attributevalue.MarshalMap(ToDynamoBatchRecord(*batch)) if err != nil { return nil, err } - fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + batch.ID.String()} - fields["SK"] = &types.AttributeValueMemberS{Value: batchKey + batch.ID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: batchSK + batch.ID.String()} fields["CreatedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", batch.CreatedAt.UTC().Unix())} return fields, nil } func MarshalMinibatchRecord(minibatch *batcher.MinibatchRecord) (map[string]types.AttributeValue, error) { - fields, err := attributevalue.MarshalMap(minibatch) + fields, err := attributevalue.MarshalMap(ToDynamoMinibatchRecord(*minibatch)) if err != nil { return nil, err } - fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + minibatch.BatchID.String()} - fields["SK"] = &types.AttributeValueMemberS{Value: minibatchKey + fmt.Sprintf("%d", minibatch.MinibatchIndex)} + fields["SK"] = &types.AttributeValueMemberS{Value: minibatchSK + fmt.Sprintf("%d", minibatch.MinibatchIndex)} return fields, nil } func MarshalDispersalRequest(request *batcher.DispersalRequest) (map[string]types.AttributeValue, error) { - fields, err := attributevalue.MarshalMap(request) + fields, err := attributevalue.MarshalMap(ToDynamoDispersalRequest(*request)) if err != nil { return nil, err } - fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + request.BatchID.String()} - fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestKey + fmt.Sprintf("%d", request.MinibatchIndex)} + fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestSK + fmt.Sprintf("%d", request.MinibatchIndex)} fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", request.RequestedAt.UTC().Unix())} return fields, nil } func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]types.AttributeValue, error) { - fields, err := attributevalue.MarshalMap(response) + fields, err := attributevalue.MarshalMap(ToDynamoDispersalResponse(*response)) if err != nil { return nil, err } - fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + response.BatchID.String()} - fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseKey + fmt.Sprintf("%d", response.MinibatchIndex)} + fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseSK + fmt.Sprintf("%d", response.MinibatchIndex)} fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())} fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.DispersalRequest.RequestedAt.UTC().Unix())} return fields, nil } + func UnmarshalBatchRecord(item commondynamodb.Item) (*batcher.BatchRecord, error) { - batch := batcher.BatchRecord{} - err := attributevalue.UnmarshalMap(item, &batch) + dbr := DynamoBatchRecord{} + err := attributevalue.UnmarshalMap(item, &dbr) + if err != nil { + return nil, err + } + + batch, err := FromDynamoBatchRecord(dbr) if err != nil { return nil, err } + batch.CreatedAt = batch.CreatedAt.UTC() return &batch, nil } func UnmarshalMinibatchRecord(item commondynamodb.Item) (*batcher.MinibatchRecord, error) { - minibatch := batcher.MinibatchRecord{} - err := attributevalue.UnmarshalMap(item, &minibatch) + dbr := DynamoMinibatchRecord{} + err := attributevalue.UnmarshalMap(item, &dbr) + if err != nil { + return nil, err + } + + minibatch, err := FromDynamoMinibatchRecord(dbr) if err != nil { return nil, err } @@ -162,19 +310,29 @@ func UnmarshalMinibatchRecord(item commondynamodb.Item) (*batcher.MinibatchRecor } func UnmarshalDispersalRequest(item commondynamodb.Item) (*batcher.DispersalRequest, error) { - request := batcher.DispersalRequest{} - err := attributevalue.UnmarshalMap(item, &request) + ddr := DynamoDispersalRequest{} + err := attributevalue.UnmarshalMap(item, &ddr) if err != nil { return nil, fmt.Errorf("failed to unmarshal dispersal request from DynamoDB: %v", err) } + request, err := FromDynamoDispersalRequest(ddr) + if err != nil { + return nil, err + } + request.RequestedAt = request.RequestedAt.UTC() return &request, nil } func UnmarshalDispersalResponse(item commondynamodb.Item) (*batcher.DispersalResponse, error) { - response := batcher.DispersalResponse{} - err := attributevalue.UnmarshalMap(item, &response) + ddr := DynamoDispersalResponse{} + err := attributevalue.UnmarshalMap(item, &ddr) + if err != nil { + return nil, err + } + + response, err := FromDynamoDispersalResponse(ddr) if err != nil { return nil, err } @@ -222,11 +380,11 @@ func (m *MinibatchStore) PutDispersalResponse(ctx context.Context, response *bat func (m *MinibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batcher.BatchRecord, error) { item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ - "PK": &types.AttributeValueMemberS{ - Value: batchKey + batchID.String(), + "BatchID": &types.AttributeValueMemberS{ + Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: batchKey + batchID.String(), + Value: batchSK + batchID.String(), }, }) if err != nil { @@ -248,11 +406,11 @@ func (m *MinibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batc func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) { item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ - "PK": &types.AttributeValueMemberS{ - Value: batchKey + batchID.String(), + "BatchID": &types.AttributeValueMemberS{ + Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: minibatchKey + fmt.Sprintf("%d", minibatchIndex), + Value: minibatchSK + fmt.Sprintf("%d", minibatchIndex), }, }) if err != nil { @@ -274,11 +432,11 @@ func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, mi func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalRequest, error) { item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ - "PK": &types.AttributeValueMemberS{ - Value: batchKey + batchID.String(), + "BatchID": &types.AttributeValueMemberS{ + Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: dispersalRequestKey + fmt.Sprintf("%d", minibatchIndex), + Value: dispersalRequestSK + fmt.Sprintf("%d", minibatchIndex), }, }) if err != nil { @@ -300,11 +458,11 @@ func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.U func (m *MinibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalResponse, error) { item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ - "PK": &types.AttributeValueMemberS{ - Value: batchKey + batchID.String(), + "BatchID": &types.AttributeValueMemberS{ + Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: dispersalResponseKey + fmt.Sprintf("%d", minibatchIndex), + Value: dispersalResponseSK + fmt.Sprintf("%d", minibatchIndex), }, }) if err != nil { diff --git a/disperser/batcher/batchstore/minibatch_store_test.go b/disperser/batcher/batchstore/minibatch_store_test.go index 75f02dd681..2f072b9f39 100644 --- a/disperser/batcher/batchstore/minibatch_store_test.go +++ b/disperser/batcher/batchstore/minibatch_store_test.go @@ -28,7 +28,7 @@ var ( dockertestResource *dockertest.Resource deployLocalStack bool - localStackPort = "4570" + localStackPort = "4566" dynamoClient *dynamodb.Client minibatchStore *batchstore.MinibatchStore @@ -165,5 +165,4 @@ func TestPutDispersalResponse(t *testing.T) { r, err := minibatchStore.GetDispersalResponse(ctx, response.BatchID, response.MinibatchIndex) assert.NoError(t, err) assert.Equal(t, response, r) - assert.Error(t, err) } diff --git a/disperser/batcher/minibatch_store.go b/disperser/batcher/minibatch_store.go index 330fdf4121..d1e3ed24c1 100644 --- a/disperser/batcher/minibatch_store.go +++ b/disperser/batcher/minibatch_store.go @@ -47,9 +47,9 @@ type MinibatchRecord struct { } type DispersalRequest struct { - BatchID uuid.UUID - MinibatchIndex uint - OperatorID [32]byte + BatchID uuid.UUID + MinibatchIndex uint + core.OperatorID OperatorAddress gcommon.Address Socket string NumBlobs uint From 7fdf62e2f44c941f6ca4865b0f0f14f0354df1f6 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Tue, 23 Jul 2024 18:47:46 -0700 Subject: [PATCH 4/8] Add Query support for primary index --- common/aws/dynamodb/client.go | 14 +++ .../batcher/batchstore/minibatch_store.go | 85 +++++++++++++++++-- .../batchstore/minibatch_store_test.go | 10 ++- 3 files changed, 99 insertions(+), 10 deletions(-) diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index 04cc7d75ab..db31cb80e0 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -166,6 +166,20 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str return response.Items, nil } +// Query returns all items in the primary index that match the given expression +func (c *Client) Query(ctx context.Context, tableName string, keyCondition string, expAttributeValues ExpresseionValues) ([]Item, error) { + response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{ + TableName: aws.String(tableName), + KeyConditionExpression: aws.String(keyCondition), + ExpressionAttributeValues: expAttributeValues, + }) + if err != nil { + return nil, err + } + + return response.Items, nil +} + // QueryIndexCount returns the count of the items in the index that match the given key func (c *Client) QueryIndexCount(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues) (int32, error) { response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{ diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go index d4ba8d8a63..9d7821b1f9 100644 --- a/disperser/batcher/batchstore/minibatch_store.go +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -63,6 +63,8 @@ type MinibatchStore struct { ttl time.Duration } +var _ batcher.MinibatchStore = (*MinibatchStore)(nil) + func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *MinibatchStore { logger.Debugf("creating minibatch store with table %s with TTL: %s", tableName, ttl) return &MinibatchStore{ @@ -263,7 +265,7 @@ func MarshalDispersalRequest(request *batcher.DispersalRequest) (map[string]type if err != nil { return nil, err } - fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestSK + fmt.Sprintf("%d", request.MinibatchIndex)} + fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestSK + fmt.Sprintf("%d#%s", request.MinibatchIndex, request.OperatorID.Hex())} fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", request.RequestedAt.UTC().Unix())} return fields, nil } @@ -273,7 +275,7 @@ func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]t if err != nil { return nil, err } - fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseSK + fmt.Sprintf("%d", response.MinibatchIndex)} + fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseSK + fmt.Sprintf("%d#%s", response.MinibatchIndex, response.OperatorID.Hex())} fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())} fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.DispersalRequest.RequestedAt.UTC().Unix())} return fields, nil @@ -359,6 +361,10 @@ func (m *MinibatchStore) PutMinibatch(ctx context.Context, minibatch *batcher.Mi return m.dynamoDBClient.PutItem(ctx, m.tableName, item) } +func (m *MinibatchStore) PutMiniBatch(ctx context.Context, minibatch *batcher.MinibatchRecord) error { + return m.PutMinibatch(ctx, minibatch) +} + func (m *MinibatchStore) PutDispersalRequest(ctx context.Context, request *batcher.DispersalRequest) error { item, err := MarshalDispersalRequest(request) if err != nil { @@ -404,6 +410,11 @@ func (m *MinibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batc return batch, nil } +// GetPendingBatch implements batcher.MinibatchStore. +func (m *MinibatchStore) GetPendingBatch(ctx context.Context) (*batcher.BatchRecord, error) { + panic("unimplemented") +} + func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) { item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ "BatchID": &types.AttributeValueMemberS{ @@ -430,13 +441,17 @@ func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, mi return minibatch, nil } -func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalRequest, error) { +func (m *MinibatchStore) GetMiniBatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) { + return m.GetMinibatch(ctx, batchID, minibatchIndex) +} + +func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*batcher.DispersalRequest, error) { item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ "BatchID": &types.AttributeValueMemberS{ Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: dispersalRequestSK + fmt.Sprintf("%d", minibatchIndex), + Value: dispersalRequestSK + fmt.Sprintf("%d#%s", minibatchIndex, opID.Hex()), }, }) if err != nil { @@ -456,13 +471,42 @@ func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.U return request, nil } -func (m *MinibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalResponse, error) { +func (m *MinibatchStore) GetDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalRequest, error) { + items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND SK = :sk", commondynamodb.ExpresseionValues{ + ":batchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + ":sk": &types.AttributeValueMemberS{ + Value: dispersalRequestSK + fmt.Sprintf("%s#%d", batchID.String(), minibatchIndex), + }, + }) + if err != nil { + return nil, err + } + + if len(items) == 0 { + return nil, fmt.Errorf("no dispersal requests found for BatchID %s MinibatchIndex %d", batchID, minibatchIndex) + } + + requests := make([]*batcher.DispersalRequest, len(items)) + for i, item := range items { + requests[i], err = UnmarshalDispersalRequest(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal requests at index %d: %v", i, err) + return nil, err + } + } + + return requests, nil +} + +func (m *MinibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*batcher.DispersalResponse, error) { item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ "BatchID": &types.AttributeValueMemberS{ Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: dispersalResponseSK + fmt.Sprintf("%d", minibatchIndex), + Value: dispersalResponseSK + fmt.Sprintf("%d#%s", minibatchIndex, opID.Hex()), }, }) if err != nil { @@ -481,3 +525,32 @@ func (m *MinibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid. } return response, nil } + +func (m *MinibatchStore) GetDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalResponse, error) { + items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND SK = :sk", commondynamodb.ExpresseionValues{ + ":batchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + ":sk": &types.AttributeValueMemberS{ + Value: dispersalResponseSK + fmt.Sprintf("%s#%d", batchID.String(), minibatchIndex), + }, + }) + if err != nil { + return nil, err + } + + if len(items) == 0 { + return nil, fmt.Errorf("no dispersal responses found for BatchID %s MinibatchIndex %d", batchID, minibatchIndex) + } + + responses := make([]*batcher.DispersalResponse, len(items)) + for i, item := range items { + responses[i], err = UnmarshalDispersalResponse(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal response at index %d: %v", i, err) + return nil, err + } + } + + return responses, nil +} diff --git a/disperser/batcher/batchstore/minibatch_store_test.go b/disperser/batcher/batchstore/minibatch_store_test.go index 2f072b9f39..48041ce9be 100644 --- a/disperser/batcher/batchstore/minibatch_store_test.go +++ b/disperser/batcher/batchstore/minibatch_store_test.go @@ -127,17 +127,18 @@ func TestPutDispersalRequest(t *testing.T) { id, err := uuid.NewV7() assert.NoError(t, err) ts := time.Now().Truncate(time.Second).UTC() + opID := core.OperatorID([32]byte{123}) request := &batcher.DispersalRequest{ BatchID: id, MinibatchIndex: 0, - OperatorID: core.OperatorID([32]byte{123}), + OperatorID: opID, OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, RequestedAt: ts, } err = minibatchStore.PutDispersalRequest(ctx, request) assert.NoError(t, err) - r, err := minibatchStore.GetDispersalRequest(ctx, request.BatchID, request.MinibatchIndex) + r, err := minibatchStore.GetDispersalRequest(ctx, request.BatchID, request.MinibatchIndex, opID) assert.NoError(t, err) assert.Equal(t, request, r) } @@ -147,11 +148,12 @@ func TestPutDispersalResponse(t *testing.T) { id, err := uuid.NewV7() assert.NoError(t, err) ts := time.Now().Truncate(time.Second).UTC() + opID := core.OperatorID([32]byte{123}) response := &batcher.DispersalResponse{ DispersalRequest: batcher.DispersalRequest{ BatchID: id, MinibatchIndex: 0, - OperatorID: core.OperatorID([32]byte{1}), + OperatorID: opID, OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, RequestedAt: ts, @@ -162,7 +164,7 @@ func TestPutDispersalResponse(t *testing.T) { } err = minibatchStore.PutDispersalResponse(ctx, response) assert.NoError(t, err) - r, err := minibatchStore.GetDispersalResponse(ctx, response.BatchID, response.MinibatchIndex) + r, err := minibatchStore.GetDispersalResponse(ctx, response.BatchID, response.MinibatchIndex, opID) assert.NoError(t, err) assert.Equal(t, response, r) } From 381a1c07635247d46ed81cffc50aa4480acd9332 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Wed, 24 Jul 2024 23:53:26 -0700 Subject: [PATCH 5/8] Add batch status GSI Add remaining minibatch store functions --- .../batcher/batchstore/minibatch_store.go | 252 ++++++++++++++++-- .../batchstore/minibatch_store_test.go | 202 +++++++++++++- .../batcher/inmem/minibatch_store_test.go | 9 + disperser/batcher/minibatch_store.go | 2 + 4 files changed, 442 insertions(+), 23 deletions(-) diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go index 9d7821b1f9..fb05f258ec 100644 --- a/disperser/batcher/batchstore/minibatch_store.go +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -3,6 +3,7 @@ package batchstore import ( "context" "fmt" + "strconv" "time" commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" @@ -18,16 +19,19 @@ import ( ) const ( - batchSK = "BATCH#" - minibatchSK = "MINIBATCH#" - dispersalRequestSK = "DISPERSAL_REQUEST#" - dispersalResponseSK = "DISPERSAL_RESPONSE#" + blobMetadataIndexName = "BlobMetadataIndex" + batchStatusIndexName = "BatchStatusIndex" + batchSK = "BATCH#" + minibatchSK = "MINIBATCH#" + dispersalRequestSK = "DISPERSAL_REQUEST#" + dispersalResponseSK = "DISPERSAL_RESPONSE#" ) type DynamoBatchRecord struct { BatchID string CreatedAt time.Time ReferenceBlockNumber uint + BatchStatus batcher.BatchStatus HeaderHash [32]byte AggregatePubKey *core.G2Point AggregateSignature *core.Signature @@ -48,6 +52,8 @@ type DynamoDispersalRequest struct { OperatorAddress string NumBlobs uint RequestedAt time.Time + BlobHash string + MetadataHash string } type DynamoDispersalResponse struct { @@ -87,13 +93,21 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit AttributeType: types.ScalarAttributeTypeS, }, { - AttributeName: aws.String("OperatorID"), - AttributeType: types.ScalarAttributeTypeS, + AttributeName: aws.String("BatchStatus"), + AttributeType: types.ScalarAttributeTypeN, }, { - AttributeName: aws.String("RequestedAt"), + AttributeName: aws.String("CreatedAt"), AttributeType: types.ScalarAttributeTypeN, }, + { + AttributeName: aws.String("BlobHash"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("MetadataHash"), + AttributeType: types.ScalarAttributeTypeS, + }, }, KeySchema: []types.KeySchemaElement{ { @@ -108,14 +122,34 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit TableName: aws.String(tableName), GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ { - IndexName: aws.String("OperatorID_RequestedAt_Index"), + IndexName: aws.String(batchStatusIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("BatchStatus"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("CreatedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + { + IndexName: aws.String(blobMetadataIndexName), KeySchema: []types.KeySchemaElement{ { - AttributeName: aws.String("OperatorID"), + AttributeName: aws.String("BlobHash"), KeyType: types.KeyTypeHash, }, { - AttributeName: aws.String("RequestedAt"), + AttributeName: aws.String("MetadataHash"), KeyType: types.KeyTypeRange, }, }, @@ -140,6 +174,7 @@ func ToDynamoBatchRecord(br batcher.BatchRecord) DynamoBatchRecord { BatchID: br.ID.String(), CreatedAt: br.CreatedAt, ReferenceBlockNumber: br.ReferenceBlockNumber, + BatchStatus: br.Status, HeaderHash: br.HeaderHash, AggregatePubKey: br.AggregatePubKey, AggregateSignature: br.AggregateSignature, @@ -164,6 +199,8 @@ func ToDynamoDispersalRequest(dr batcher.DispersalRequest) DynamoDispersalReques OperatorAddress: dr.OperatorAddress.Hex(), NumBlobs: dr.NumBlobs, RequestedAt: dr.RequestedAt, + BlobHash: dr.BlobHash, + MetadataHash: dr.MetadataHash, } } @@ -186,6 +223,7 @@ func FromDynamoBatchRecord(dbr DynamoBatchRecord) (batcher.BatchRecord, error) { ID: batchID, CreatedAt: dbr.CreatedAt, ReferenceBlockNumber: dbr.ReferenceBlockNumber, + Status: dbr.BatchStatus, HeaderHash: dbr.HeaderHash, AggregatePubKey: dbr.AggregatePubKey, AggregateSignature: dbr.AggregateSignature, @@ -224,6 +262,8 @@ func FromDynamoDispersalRequest(ddr DynamoDispersalRequest) (batcher.DispersalRe OperatorAddress: gcommon.HexToAddress(ddr.OperatorAddress), NumBlobs: ddr.NumBlobs, RequestedAt: ddr.RequestedAt, + BlobHash: ddr.BlobHash, + MetadataHash: ddr.MetadataHash, }, nil } @@ -361,17 +401,12 @@ func (m *MinibatchStore) PutMinibatch(ctx context.Context, minibatch *batcher.Mi return m.dynamoDBClient.PutItem(ctx, m.tableName, item) } -func (m *MinibatchStore) PutMiniBatch(ctx context.Context, minibatch *batcher.MinibatchRecord) error { - return m.PutMinibatch(ctx, minibatch) -} - func (m *MinibatchStore) PutDispersalRequest(ctx context.Context, request *batcher.DispersalRequest) error { item, err := MarshalDispersalRequest(request) if err != nil { return err } - fmt.Printf("%v", item) return m.dynamoDBClient.PutItem(ctx, m.tableName, item) } @@ -410,9 +445,82 @@ func (m *MinibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batc return batch, nil } -// GetPendingBatch implements batcher.MinibatchStore. -func (m *MinibatchStore) GetPendingBatch(ctx context.Context) (*batcher.BatchRecord, error) { - panic("unimplemented") +func (m *MinibatchStore) BatchDispersed(ctx context.Context, batchID uuid.UUID) (bool, error) { + dispersalRequests, err := m.GetDispersalRequests(ctx, batchID) + if err != nil { + return false, fmt.Errorf("failed to get dispersal requests for batch %s - %v", batchID.String(), err) + + } + dispersalResponses, err := m.GetDispersalResponses(ctx, batchID) + if err != nil { + return false, fmt.Errorf("failed to get dispersal responses for batch %s - %v", batchID.String(), err) + } + if len(dispersalRequests) != len(dispersalResponses) { + m.logger.Info("number of minibatch dispersal requests does not match responses", "batchID", batchID, "numRequests", len(dispersalRequests), "numResponses", len(dispersalResponses)) + return false, nil + } + if len(dispersalRequests) == 0 || len(dispersalResponses) == 0 { + m.logger.Info("no dispersal requests or responses found", "batchID", batchID) + return false, nil + } + return true, nil +} + +func (m *MinibatchStore) GetBatchesByStatus(ctx context.Context, status batcher.BatchStatus) ([]*batcher.BatchRecord, error) { + items, err := m.dynamoDBClient.QueryIndex(ctx, m.tableName, batchStatusIndexName, "BatchStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }}) + if err != nil { + return nil, err + } + + batches := make([]*batcher.BatchRecord, len(items)) + for i, item := range items { + batches[i], err = UnmarshalBatchRecord(item) + if err != nil { + m.logger.Errorf("failed to unmarshal batch record at index %d: %v", i, err) + return nil, err + } + } + + return batches, nil +} + +func (m *MinibatchStore) GetLatestFormedBatch(ctx context.Context) (batch *batcher.BatchRecord, minibatches []*batcher.MinibatchRecord, err error) { + formed, err := m.GetBatchesByStatus(ctx, batcher.BatchStatusFormed) + if err != nil { + return nil, nil, err + } + if len(formed) == 0 { + return nil, nil, nil + } + batch = formed[0] + minibatches, err = m.GetMinibatches(ctx, batch.ID) + if err != nil { + return nil, nil, err + } + return batch, minibatches, nil +} + +func (m *MinibatchStore) UpdateBatchStatus(ctx context.Context, batchID uuid.UUID, status batcher.BatchStatus) error { + if status < batcher.BatchStatusFormed || status > batcher.BatchStatusFailed { + return fmt.Errorf("invalid batch status %v", status) + } + _, err := m.dynamoDBClient.UpdateItem(ctx, m.tableName, map[string]types.AttributeValue{ + "BatchID": &types.AttributeValueMemberS{Value: batchID.String()}, + "SK": &types.AttributeValueMemberS{Value: batchSK + batchID.String()}, + }, commondynamodb.Item{ + "BatchStatus": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + }) + + if err != nil { + return fmt.Errorf("failed to update batch status: %v", err) + } + + return nil } func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) { @@ -441,8 +549,29 @@ func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, mi return minibatch, nil } -func (m *MinibatchStore) GetMiniBatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) { - return m.GetMinibatch(ctx, batchID, minibatchIndex) +func (m *MinibatchStore) GetMinibatches(ctx context.Context, batchID uuid.UUID) ([]*batcher.MinibatchRecord, error) { + items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND begins_with(SK, :prefix)", commondynamodb.ExpresseionValues{ + ":batchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + ":prefix": &types.AttributeValueMemberS{ + Value: minibatchSK, + }, + }) + if err != nil { + return nil, err + } + + minibatches := make([]*batcher.MinibatchRecord, len(items)) + for i, item := range items { + minibatches[i], err = UnmarshalMinibatchRecord(item) + if err != nil { + m.logger.Errorf("failed to unmarshal minibatch record at index %d: %v", i, err) + return nil, err + } + } + + return minibatches, nil } func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*batcher.DispersalRequest, error) { @@ -471,7 +600,32 @@ func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.U return request, nil } -func (m *MinibatchStore) GetDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalRequest, error) { +func (m *MinibatchStore) GetDispersalRequests(ctx context.Context, batchID uuid.UUID) ([]*batcher.DispersalRequest, error) { + items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND begins_with(SK, :prefix)", commondynamodb.ExpresseionValues{ + ":batchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + ":prefix": &types.AttributeValueMemberS{ + Value: dispersalRequestSK, + }, + }) + if err != nil { + return nil, err + } + + requests := make([]*batcher.DispersalRequest, len(items)) + for i, item := range items { + requests[i], err = UnmarshalDispersalRequest(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal requests at index %d: %v", i, err) + return nil, err + } + } + + return requests, nil +} + +func (m *MinibatchStore) GetMinibatchDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalRequest, error) { items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND SK = :sk", commondynamodb.ExpresseionValues{ ":batchID": &types.AttributeValueMemberS{ Value: batchID.String(), @@ -526,7 +680,61 @@ func (m *MinibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid. return response, nil } -func (m *MinibatchStore) GetDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalResponse, error) { +func (m *MinibatchStore) GetDispersalResponsesByBlobMetada(ctx context.Context, blobHash string, metadataHash string) ([]*batcher.DispersalResponse, error) { + items, err := m.dynamoDBClient.QueryIndex(ctx, m.tableName, blobMetadataIndexName, "BlobHash = :blobHash AND MetadataHash = :metadataHash", commondynamodb.ExpresseionValues{ + ":blobHash": &types.AttributeValueMemberS{ + Value: blobHash, + }, + ":metadataHash": &types.AttributeValueMemberS{ + Value: metadataHash, + }, + }) + if err != nil { + return nil, err + } + + if len(items) == 0 { + return nil, fmt.Errorf("no dispersal responses found for BlobHash %s MetadataHash %s", blobHash, metadataHash) + } + + responses := make([]*batcher.DispersalResponse, len(items)) + for i, item := range items { + responses[i], err = UnmarshalDispersalResponse(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal response at index %d: %v", i, err) + return nil, err + } + } + + return responses, nil +} + +func (m *MinibatchStore) GetDispersalResponses(ctx context.Context, batchID uuid.UUID) ([]*batcher.DispersalResponse, error) { + items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND begins_with(SK, :prefix)", commondynamodb.ExpresseionValues{ + ":batchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + ":prefix": &types.AttributeValueMemberS{ + Value: dispersalResponseSK, + }, + }) + if err != nil { + return nil, err + } + + responses := make([]*batcher.DispersalResponse, len(items)) + for i, item := range items { + responses[i], err = UnmarshalDispersalResponse(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal response at index %d: %v", i, err) + return nil, err + } + } + + return responses, nil +} + +func (m *MinibatchStore) GetMinibatchDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalResponse, error) { items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND SK = :sk", commondynamodb.ExpresseionValues{ ":batchID": &types.AttributeValueMemberS{ Value: batchID.String(), diff --git a/disperser/batcher/batchstore/minibatch_store_test.go b/disperser/batcher/batchstore/minibatch_store_test.go index 48041ce9be..38c05a8fc2 100644 --- a/disperser/batcher/batchstore/minibatch_store_test.go +++ b/disperser/batcher/batchstore/minibatch_store_test.go @@ -67,7 +67,6 @@ func setup(m *testing.M) { panic("failed to create dynamodb client: " + err.Error()) } - fmt.Printf("m: %v\n", m) minibatchStore = batchstore.NewMinibatchStore(dynamoClient, logger, minibatchTableName, time.Hour) } @@ -93,6 +92,7 @@ func TestPutBatch(t *testing.T) { ID: id, CreatedAt: ts, ReferenceBlockNumber: 1, + Status: batcher.BatchStatusPending, HeaderHash: [32]byte{1}, AggregatePubKey: nil, AggregateSignature: nil, @@ -102,6 +102,73 @@ func TestPutBatch(t *testing.T) { b, err := minibatchStore.GetBatch(ctx, batch.ID) assert.NoError(t, err) assert.Equal(t, batch, b) + err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, batcher.BatchStatusFormed) + assert.NoError(t, err) + err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, 4) + assert.Error(t, err) + u, err := minibatchStore.GetBatch(ctx, batch.ID) + assert.NoError(t, err) + assert.Equal(t, u.Status, batcher.BatchStatusFormed) + assert.Equal(t, batch, b) +} + +func TestGetBatchesByStatus(t *testing.T) { + ctx := context.Background() + id1, _ := uuid.NewV7() + id2, _ := uuid.NewV7() + id3, _ := uuid.NewV7() + ts := time.Now().Truncate(time.Second).UTC() + batch1 := &batcher.BatchRecord{ + ID: id1, + CreatedAt: ts, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusAttested, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + _ = minibatchStore.PutBatch(ctx, batch1) + batch2 := &batcher.BatchRecord{ + ID: id2, + CreatedAt: ts, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusAttested, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + err := minibatchStore.PutBatch(ctx, batch2) + assert.NoError(t, err) + batch3 := &batcher.BatchRecord{ + ID: id3, + CreatedAt: ts, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusAttested, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + err = minibatchStore.PutBatch(ctx, batch3) + assert.NoError(t, err) + + pending, err := minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusAttested) + assert.NoError(t, err) + assert.Equal(t, 3, len(pending)) + + err = minibatchStore.UpdateBatchStatus(ctx, id1, batcher.BatchStatusFormed) + assert.NoError(t, err) + + formed, err := minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusFormed) + assert.NoError(t, err) + assert.Equal(t, 2, len(formed)) + + pending, err = minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusAttested) + assert.NoError(t, err) + assert.Equal(t, 2, len(pending)) + + failed, err := minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusFailed) + assert.NoError(t, err) + assert.Equal(t, 0, len(failed)) } func TestPutMinibatch(t *testing.T) { @@ -122,6 +189,74 @@ func TestPutMinibatch(t *testing.T) { assert.Equal(t, minibatch, m) } +func TestGetLatestFormedBatch(t *testing.T) { + ctx := context.Background() + id1, _ := uuid.NewV7() + id2, _ := uuid.NewV7() + ts := time.Now().Truncate(time.Second).UTC() + batch1 := &batcher.BatchRecord{ + ID: id1, + CreatedAt: ts, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusPending, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + minibatch1 := &batcher.MinibatchRecord{ + BatchID: id1, + MinibatchIndex: 1, + BlobHeaderHashes: [][32]byte{{1}}, + BatchSize: 1, + ReferenceBlockNumber: 1, + } + minibatch2 := &batcher.MinibatchRecord{ + BatchID: id1, + MinibatchIndex: 2, + BlobHeaderHashes: [][32]byte{{1}}, + BatchSize: 1, + ReferenceBlockNumber: 1, + } + batch2 := &batcher.BatchRecord{ + ID: id2, + CreatedAt: ts, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusPending, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + minibatch3 := &batcher.MinibatchRecord{ + BatchID: id2, + MinibatchIndex: 1, + BlobHeaderHashes: [][32]byte{{1}}, + BatchSize: 1, + ReferenceBlockNumber: 1, + } + err := minibatchStore.PutBatch(ctx, batch1) + assert.NoError(t, err) + err = minibatchStore.PutMinibatch(ctx, minibatch1) + assert.NoError(t, err) + err = minibatchStore.PutMinibatch(ctx, minibatch2) + assert.NoError(t, err) + err = minibatchStore.PutBatch(ctx, batch2) + assert.NoError(t, err) + err = minibatchStore.PutMinibatch(ctx, minibatch3) + assert.NoError(t, err) + + batch, minibatches, err := minibatchStore.GetLatestFormedBatch(ctx) + assert.NoError(t, err) + //assert.Equal(t, (*batcher.BatchRecord)(nil), batch) + //assert.Equal(t, []*batcher.MinibatchRecord([]*batcher.MinibatchRecord(nil)), minibatches) + + err = minibatchStore.UpdateBatchStatus(ctx, id1, batcher.BatchStatusFormed) + assert.NoError(t, err) + + batch, minibatches, err = minibatchStore.GetLatestFormedBatch(ctx) + assert.NoError(t, err) + assert.Equal(t, 2, len(minibatches)) + assert.Equal(t, batch.ID, batch1.ID) +} func TestPutDispersalRequest(t *testing.T) { ctx := context.Background() id, err := uuid.NewV7() @@ -135,6 +270,8 @@ func TestPutDispersalRequest(t *testing.T) { OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, RequestedAt: ts, + BlobHash: "blobHash", + MetadataHash: "metadataHash", } err = minibatchStore.PutDispersalRequest(ctx, request) assert.NoError(t, err) @@ -149,6 +286,8 @@ func TestPutDispersalResponse(t *testing.T) { assert.NoError(t, err) ts := time.Now().Truncate(time.Second).UTC() opID := core.OperatorID([32]byte{123}) + blobHash := "blobHash" + metadataHash := "metadataHash" response := &batcher.DispersalResponse{ DispersalRequest: batcher.DispersalRequest{ BatchID: id, @@ -157,6 +296,8 @@ func TestPutDispersalResponse(t *testing.T) { OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, RequestedAt: ts, + BlobHash: blobHash, + MetadataHash: metadataHash, }, Signatures: nil, RespondedAt: ts, @@ -167,4 +308,63 @@ func TestPutDispersalResponse(t *testing.T) { r, err := minibatchStore.GetDispersalResponse(ctx, response.BatchID, response.MinibatchIndex, opID) assert.NoError(t, err) assert.Equal(t, response, r) + rs, err := minibatchStore.GetDispersalResponsesByBlobMetada(ctx, blobHash, metadataHash) + assert.NoError(t, err) + assert.Equal(t, response, rs[0]) +} + +func TestDispersalStatus(t *testing.T) { + ctx := context.Background() + id, err := uuid.NewV7() + assert.NoError(t, err) + ts := time.Now().Truncate(time.Second).UTC() + opID := core.OperatorID([32]byte{123}) + blobHash := "blobHash" + metadataHash := "metadataHash" + + // no dispersals + dispersed, err := minibatchStore.BatchDispersed(ctx, id) + assert.NoError(t, err) + assert.False(t, dispersed) + + request := &batcher.DispersalRequest{ + BatchID: id, + MinibatchIndex: 0, + OperatorID: opID, + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: ts, + BlobHash: blobHash, + MetadataHash: metadataHash, + } + err = minibatchStore.PutDispersalRequest(ctx, request) + assert.NoError(t, err) + + // dispersal request but no response + dispersed, err = minibatchStore.BatchDispersed(ctx, id) + assert.NoError(t, err) + assert.False(t, dispersed) + + response := &batcher.DispersalResponse{ + DispersalRequest: batcher.DispersalRequest{ + BatchID: id, + MinibatchIndex: 0, + OperatorID: opID, + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: ts, + BlobHash: blobHash, + MetadataHash: metadataHash, + }, + Signatures: nil, + RespondedAt: ts, + Error: nil, + } + err = minibatchStore.PutDispersalResponse(ctx, response) + assert.NoError(t, err) + + // dispersal request and response + dispersed, err = minibatchStore.BatchDispersed(ctx, id) + assert.NoError(t, err) + assert.True(t, dispersed) } diff --git a/disperser/batcher/inmem/minibatch_store_test.go b/disperser/batcher/inmem/minibatch_store_test.go index b9e2f02f44..29b30294e4 100644 --- a/disperser/batcher/inmem/minibatch_store_test.go +++ b/disperser/batcher/inmem/minibatch_store_test.go @@ -26,6 +26,7 @@ func TestPutBatch(t *testing.T) { ID: id, CreatedAt: time.Now().UTC(), ReferenceBlockNumber: 1, + Status: 1, HeaderHash: [32]byte{1}, AggregatePubKey: nil, AggregateSignature: nil, @@ -70,6 +71,8 @@ func TestPutDispersalRequest(t *testing.T) { OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, RequestedAt: time.Now().UTC(), + BlobHash: "1a2b", + MetadataHash: "3c4d", } err = s.PutDispersalRequest(ctx, req1) assert.NoError(t, err) @@ -80,6 +83,8 @@ func TestPutDispersalRequest(t *testing.T) { OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, RequestedAt: time.Now().UTC(), + BlobHash: "1a2b", + MetadataHash: "3c4d", } err = s.PutDispersalRequest(ctx, req2) assert.NoError(t, err) @@ -113,6 +118,8 @@ func TestPutDispersalResponse(t *testing.T) { OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, RequestedAt: time.Now().UTC(), + BlobHash: "1a2b", + MetadataHash: "3c4d", }, Signatures: nil, RespondedAt: time.Now().UTC(), @@ -126,6 +133,8 @@ func TestPutDispersalResponse(t *testing.T) { OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, RequestedAt: time.Now().UTC(), + BlobHash: "0x0", + MetadataHash: "0x0", }, Signatures: nil, RespondedAt: time.Now().UTC(), diff --git a/disperser/batcher/minibatch_store.go b/disperser/batcher/minibatch_store.go index d1e3ed24c1..d538c4384c 100644 --- a/disperser/batcher/minibatch_store.go +++ b/disperser/batcher/minibatch_store.go @@ -54,6 +54,8 @@ type DispersalRequest struct { Socket string NumBlobs uint RequestedAt time.Time + BlobHash string + MetadataHash string } type DispersalResponse struct { From 027d3f68001060d1bbf4c9dc859c2d25a33f66d7 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Thu, 25 Jul 2024 10:16:15 -0700 Subject: [PATCH 6/8] Update tests Remove blobMetadataIndex --- .../batcher/batchstore/minibatch_store.go | 59 +------------------ .../batchstore/minibatch_store_test.go | 40 +++++-------- 2 files changed, 16 insertions(+), 83 deletions(-) diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go index fb05f258ec..066aa98bf5 100644 --- a/disperser/batcher/batchstore/minibatch_store.go +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -100,14 +100,6 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit AttributeName: aws.String("CreatedAt"), AttributeType: types.ScalarAttributeTypeN, }, - { - AttributeName: aws.String("BlobHash"), - AttributeType: types.ScalarAttributeTypeS, - }, - { - AttributeName: aws.String("MetadataHash"), - AttributeType: types.ScalarAttributeTypeS, - }, }, KeySchema: []types.KeySchemaElement{ { @@ -141,26 +133,6 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit WriteCapacityUnits: aws.Int64(writeCapacityUnits), }, }, - { - IndexName: aws.String(blobMetadataIndexName), - KeySchema: []types.KeySchemaElement{ - { - AttributeName: aws.String("BlobHash"), - KeyType: types.KeyTypeHash, - }, - { - AttributeName: aws.String("MetadataHash"), - KeyType: types.KeyTypeRange, - }, - }, - Projection: &types.Projection{ - ProjectionType: types.ProjectionTypeAll, - }, - ProvisionedThroughput: &types.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(readCapacityUnits), - WriteCapacityUnits: aws.Int64(writeCapacityUnits), - }, - }, }, ProvisionedThroughput: &types.ProvisionedThroughput{ ReadCapacityUnits: aws.Int64(readCapacityUnits), @@ -495,7 +467,7 @@ func (m *MinibatchStore) GetLatestFormedBatch(ctx context.Context) (batch *batch if len(formed) == 0 { return nil, nil, nil } - batch = formed[0] + batch = formed[len(formed)-1] minibatches, err = m.GetMinibatches(ctx, batch.ID) if err != nil { return nil, nil, err @@ -680,35 +652,6 @@ func (m *MinibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid. return response, nil } -func (m *MinibatchStore) GetDispersalResponsesByBlobMetada(ctx context.Context, blobHash string, metadataHash string) ([]*batcher.DispersalResponse, error) { - items, err := m.dynamoDBClient.QueryIndex(ctx, m.tableName, blobMetadataIndexName, "BlobHash = :blobHash AND MetadataHash = :metadataHash", commondynamodb.ExpresseionValues{ - ":blobHash": &types.AttributeValueMemberS{ - Value: blobHash, - }, - ":metadataHash": &types.AttributeValueMemberS{ - Value: metadataHash, - }, - }) - if err != nil { - return nil, err - } - - if len(items) == 0 { - return nil, fmt.Errorf("no dispersal responses found for BlobHash %s MetadataHash %s", blobHash, metadataHash) - } - - responses := make([]*batcher.DispersalResponse, len(items)) - for i, item := range items { - responses[i], err = UnmarshalDispersalResponse(item) - if err != nil { - m.logger.Errorf("failed to unmarshal dispersal response at index %d: %v", i, err) - return nil, err - } - } - - return responses, nil -} - func (m *MinibatchStore) GetDispersalResponses(ctx context.Context, batchID uuid.UUID) ([]*batcher.DispersalResponse, error) { items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND begins_with(SK, :prefix)", commondynamodb.ExpresseionValues{ ":batchID": &types.AttributeValueMemberS{ diff --git a/disperser/batcher/batchstore/minibatch_store_test.go b/disperser/batcher/batchstore/minibatch_store_test.go index 38c05a8fc2..21aa5bcd5b 100644 --- a/disperser/batcher/batchstore/minibatch_store_test.go +++ b/disperser/batcher/batchstore/minibatch_store_test.go @@ -194,11 +194,12 @@ func TestGetLatestFormedBatch(t *testing.T) { id1, _ := uuid.NewV7() id2, _ := uuid.NewV7() ts := time.Now().Truncate(time.Second).UTC() + ts2 := ts.Add(10 * time.Second) batch1 := &batcher.BatchRecord{ ID: id1, CreatedAt: ts, ReferenceBlockNumber: 1, - Status: batcher.BatchStatusPending, + Status: batcher.BatchStatusFormed, HeaderHash: [32]byte{1}, AggregatePubKey: nil, AggregateSignature: nil, @@ -210,52 +211,44 @@ func TestGetLatestFormedBatch(t *testing.T) { BatchSize: 1, ReferenceBlockNumber: 1, } - minibatch2 := &batcher.MinibatchRecord{ - BatchID: id1, - MinibatchIndex: 2, - BlobHeaderHashes: [][32]byte{{1}}, - BatchSize: 1, - ReferenceBlockNumber: 1, - } batch2 := &batcher.BatchRecord{ ID: id2, - CreatedAt: ts, + CreatedAt: ts2, ReferenceBlockNumber: 1, - Status: batcher.BatchStatusPending, + Status: batcher.BatchStatusFormed, HeaderHash: [32]byte{1}, AggregatePubKey: nil, AggregateSignature: nil, } - minibatch3 := &batcher.MinibatchRecord{ + minibatch2 := &batcher.MinibatchRecord{ BatchID: id2, MinibatchIndex: 1, BlobHeaderHashes: [][32]byte{{1}}, BatchSize: 1, ReferenceBlockNumber: 1, } + minibatch3 := &batcher.MinibatchRecord{ + BatchID: id2, + MinibatchIndex: 2, + BlobHeaderHashes: [][32]byte{{1}}, + BatchSize: 1, + ReferenceBlockNumber: 1, + } err := minibatchStore.PutBatch(ctx, batch1) assert.NoError(t, err) err = minibatchStore.PutMinibatch(ctx, minibatch1) assert.NoError(t, err) - err = minibatchStore.PutMinibatch(ctx, minibatch2) - assert.NoError(t, err) err = minibatchStore.PutBatch(ctx, batch2) assert.NoError(t, err) + err = minibatchStore.PutMinibatch(ctx, minibatch2) + assert.NoError(t, err) err = minibatchStore.PutMinibatch(ctx, minibatch3) assert.NoError(t, err) batch, minibatches, err := minibatchStore.GetLatestFormedBatch(ctx) assert.NoError(t, err) - //assert.Equal(t, (*batcher.BatchRecord)(nil), batch) - //assert.Equal(t, []*batcher.MinibatchRecord([]*batcher.MinibatchRecord(nil)), minibatches) - - err = minibatchStore.UpdateBatchStatus(ctx, id1, batcher.BatchStatusFormed) - assert.NoError(t, err) - - batch, minibatches, err = minibatchStore.GetLatestFormedBatch(ctx) - assert.NoError(t, err) assert.Equal(t, 2, len(minibatches)) - assert.Equal(t, batch.ID, batch1.ID) + assert.Equal(t, batch.ID, batch2.ID) } func TestPutDispersalRequest(t *testing.T) { ctx := context.Background() @@ -308,9 +301,6 @@ func TestPutDispersalResponse(t *testing.T) { r, err := minibatchStore.GetDispersalResponse(ctx, response.BatchID, response.MinibatchIndex, opID) assert.NoError(t, err) assert.Equal(t, response, r) - rs, err := minibatchStore.GetDispersalResponsesByBlobMetada(ctx, blobHash, metadataHash) - assert.NoError(t, err) - assert.Equal(t, response, rs[0]) } func TestDispersalStatus(t *testing.T) { From 431f72e54812db27e233968c71fc751687936c25 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Thu, 25 Jul 2024 14:13:58 -0700 Subject: [PATCH 7/8] Refactor custom type handling Add unique constraint on (batchID,BATCH#batchID) --- common/aws/dynamodb/client.go | 12 + .../batcher/batchstore/minibatch_store.go | 290 +++++++----------- .../batchstore/minibatch_store_test.go | 47 ++- disperser/batcher/minibatch_store.go | 12 +- 4 files changed, 161 insertions(+), 200 deletions(-) diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index db31cb80e0..2e309a86bd 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -105,6 +105,18 @@ func (c *Client) PutItem(ctx context.Context, tableName string, item Item) (err return nil } +func (c *Client) PutItemWithCondition(ctx context.Context, tableName string, item Item, condition string) (err error) { + _, err = c.dynamoClient.PutItem(ctx, &dynamodb.PutItemInput{ + TableName: aws.String(tableName), Item: item, + ConditionExpression: aws.String(condition), + }) + if err != nil { + return err + } + + return nil +} + // PutItems puts items in batches of 25 items (which is a limit DynamoDB imposes) // It returns the items that failed to be put. func (c *Client) PutItems(ctx context.Context, tableName string, items []Item) ([]Item, error) { diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go index 066aa98bf5..cf69c7b24d 100644 --- a/disperser/batcher/batchstore/minibatch_store.go +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -14,54 +14,17 @@ import ( "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - gcommon "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" ) const ( - blobMetadataIndexName = "BlobMetadataIndex" - batchStatusIndexName = "BatchStatusIndex" - batchSK = "BATCH#" - minibatchSK = "MINIBATCH#" - dispersalRequestSK = "DISPERSAL_REQUEST#" - dispersalResponseSK = "DISPERSAL_RESPONSE#" + batchStatusIndexName = "BatchStatusIndex" + batchSKPrefix = "BATCH#" + minibatchSKPrefix = "MINIBATCH#" + dispersalRequestSKPrefix = "DISPERSAL_REQUEST#" + dispersalResponseSKPrefix = "DISPERSAL_RESPONSE#" ) -type DynamoBatchRecord struct { - BatchID string - CreatedAt time.Time - ReferenceBlockNumber uint - BatchStatus batcher.BatchStatus - HeaderHash [32]byte - AggregatePubKey *core.G2Point - AggregateSignature *core.Signature -} - -type DynamoMinibatchRecord struct { - BatchID string - MinibatchIndex uint - BlobHeaderHashes [][32]byte - BatchSize uint64 - ReferenceBlockNumber uint -} - -type DynamoDispersalRequest struct { - BatchID string - MinibatchIndex uint - OperatorID string - OperatorAddress string - NumBlobs uint - RequestedAt time.Time - BlobHash string - MetadataHash string -} - -type DynamoDispersalResponse struct { - DynamoDispersalRequest - Signatures []*core.Signature - RespondedAt time.Time - Error error -} type MinibatchStore struct { dynamoDBClient *commondynamodb.Client tableName string @@ -141,215 +104,188 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit } } -func ToDynamoBatchRecord(br batcher.BatchRecord) DynamoBatchRecord { - return DynamoBatchRecord{ - BatchID: br.ID.String(), - CreatedAt: br.CreatedAt, - ReferenceBlockNumber: br.ReferenceBlockNumber, - BatchStatus: br.Status, - HeaderHash: br.HeaderHash, - AggregatePubKey: br.AggregatePubKey, - AggregateSignature: br.AggregateSignature, - } -} - -func ToDynamoMinibatchRecord(br batcher.MinibatchRecord) DynamoMinibatchRecord { - return DynamoMinibatchRecord{ - BatchID: br.BatchID.String(), - MinibatchIndex: br.MinibatchIndex, - BlobHeaderHashes: br.BlobHeaderHashes, - BatchSize: br.BatchSize, - ReferenceBlockNumber: br.ReferenceBlockNumber, +func MarshalBatchRecord(batch *batcher.BatchRecord) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*batch) + if err != nil { + return nil, err } + fields["BatchID"] = &types.AttributeValueMemberS{Value: batch.ID.String()} + fields["BatchStatus"] = &types.AttributeValueMemberN{Value: strconv.Itoa(int(batch.Status))} + fields["SK"] = &types.AttributeValueMemberS{Value: batchSKPrefix + batch.ID.String()} + fields["CreatedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", batch.CreatedAt.UTC().Unix())} + return fields, nil } -func ToDynamoDispersalRequest(dr batcher.DispersalRequest) DynamoDispersalRequest { - return DynamoDispersalRequest{ - BatchID: dr.BatchID.String(), - MinibatchIndex: dr.MinibatchIndex, - OperatorID: dr.OperatorID.Hex(), - OperatorAddress: dr.OperatorAddress.Hex(), - NumBlobs: dr.NumBlobs, - RequestedAt: dr.RequestedAt, - BlobHash: dr.BlobHash, - MetadataHash: dr.MetadataHash, +func MarshalMinibatchRecord(minibatch *batcher.MinibatchRecord) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*minibatch) + if err != nil { + return nil, err } + fields["BatchID"] = &types.AttributeValueMemberS{Value: minibatch.BatchID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: minibatchSKPrefix + fmt.Sprintf("%d", minibatch.MinibatchIndex)} + return fields, nil } -func ToDynamoDispersalResponse(dr batcher.DispersalResponse) DynamoDispersalResponse { - return DynamoDispersalResponse{ - DynamoDispersalRequest: ToDynamoDispersalRequest(dr.DispersalRequest), - Signatures: dr.Signatures, - RespondedAt: dr.RespondedAt, - Error: dr.Error, +func MarshalDispersalRequest(request *batcher.DispersalRequest) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*request) + if err != nil { + return nil, err } + fields["BatchID"] = &types.AttributeValueMemberS{Value: request.BatchID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestSKPrefix + fmt.Sprintf("%d#%s", request.MinibatchIndex, request.OperatorID.Hex())} + fields["OperatorID"] = &types.AttributeValueMemberS{Value: request.OperatorID.Hex()} + fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", request.RequestedAt.UTC().Unix())} + return fields, nil } -func FromDynamoBatchRecord(dbr DynamoBatchRecord) (batcher.BatchRecord, error) { - batchID, err := uuid.Parse(dbr.BatchID) +func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*response) if err != nil { - return batcher.BatchRecord{}, fmt.Errorf("failed to convert dynamo batch record batch ID %v from string: %v", dbr.BatchID, err) + return nil, err } - - return batcher.BatchRecord{ - ID: batchID, - CreatedAt: dbr.CreatedAt, - ReferenceBlockNumber: dbr.ReferenceBlockNumber, - Status: dbr.BatchStatus, - HeaderHash: dbr.HeaderHash, - AggregatePubKey: dbr.AggregatePubKey, - AggregateSignature: dbr.AggregateSignature, - }, nil + fields["BatchID"] = &types.AttributeValueMemberS{Value: response.BatchID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseSKPrefix + fmt.Sprintf("%d#%s", response.MinibatchIndex, response.OperatorID.Hex())} + fields["OperatorID"] = &types.AttributeValueMemberS{Value: response.OperatorID.Hex()} + fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())} + fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.DispersalRequest.RequestedAt.UTC().Unix())} + return fields, nil } -func FromDynamoMinibatchRecord(dbr DynamoMinibatchRecord) (batcher.MinibatchRecord, error) { - batchID, err := uuid.Parse(dbr.BatchID) - if err != nil { - return batcher.MinibatchRecord{}, fmt.Errorf("failed to convert dynamo minibatch record batch ID %v from string: %v", dbr.BatchID, err) +func UnmarshalBatchID(item commondynamodb.Item) (*uuid.UUID, error) { + type BatchID struct { + BatchID string } - return batcher.MinibatchRecord{ - BatchID: batchID, - MinibatchIndex: dbr.MinibatchIndex, - BlobHeaderHashes: dbr.BlobHeaderHashes, - BatchSize: dbr.BatchSize, - ReferenceBlockNumber: dbr.ReferenceBlockNumber, - }, nil -} - -func FromDynamoDispersalRequest(ddr DynamoDispersalRequest) (batcher.DispersalRequest, error) { - batchID, err := uuid.Parse(ddr.BatchID) + batch := BatchID{} + err := attributevalue.UnmarshalMap(item, &batch) if err != nil { - return batcher.DispersalRequest{}, fmt.Errorf("failed to convert dynamo dispersal request batch ID %v from string: %v", ddr.BatchID, err) + return nil, err } - operatorID, err := core.OperatorIDFromHex(ddr.OperatorID) + + batchID, err := uuid.Parse(batch.BatchID) if err != nil { - return batcher.DispersalRequest{}, fmt.Errorf("failed to convert dynamo dispersal request operator ID %v from hex: %v", ddr.OperatorID, err) + return nil, err } - return batcher.DispersalRequest{ - BatchID: batchID, - MinibatchIndex: ddr.MinibatchIndex, - OperatorID: operatorID, - OperatorAddress: gcommon.HexToAddress(ddr.OperatorAddress), - NumBlobs: ddr.NumBlobs, - RequestedAt: ddr.RequestedAt, - BlobHash: ddr.BlobHash, - MetadataHash: ddr.MetadataHash, - }, nil + return &batchID, nil } -func FromDynamoDispersalResponse(ddr DynamoDispersalResponse) (batcher.DispersalResponse, error) { - request, err := FromDynamoDispersalRequest(ddr.DynamoDispersalRequest) - if err != nil { - return batcher.DispersalResponse{}, err +func UnmarshalBatchStatus(item commondynamodb.Item) (*batcher.BatchStatus, error) { + type BatchStatus struct { + BatchStatus uint } - return batcher.DispersalResponse{ - DispersalRequest: request, - Signatures: ddr.Signatures, - RespondedAt: ddr.RespondedAt, - Error: ddr.Error, - }, nil -} - -func MarshalBatchRecord(batch *batcher.BatchRecord) (map[string]types.AttributeValue, error) { - fields, err := attributevalue.MarshalMap(ToDynamoBatchRecord(*batch)) + batch := BatchStatus{} + err := attributevalue.UnmarshalMap(item, &batch) if err != nil { return nil, err } - fields["SK"] = &types.AttributeValueMemberS{Value: batchSK + batch.ID.String()} - fields["CreatedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", batch.CreatedAt.UTC().Unix())} - return fields, nil + + status := batcher.BatchStatus(batch.BatchStatus) + + return &status, nil } -func MarshalMinibatchRecord(minibatch *batcher.MinibatchRecord) (map[string]types.AttributeValue, error) { - fields, err := attributevalue.MarshalMap(ToDynamoMinibatchRecord(*minibatch)) +func UnmarshalOperatorID(item commondynamodb.Item) (*core.OperatorID, error) { + type OperatorID struct { + OperatorID string + } + + dispersal := OperatorID{} + err := attributevalue.UnmarshalMap(item, &dispersal) if err != nil { return nil, err } - fields["SK"] = &types.AttributeValueMemberS{Value: minibatchSK + fmt.Sprintf("%d", minibatch.MinibatchIndex)} - return fields, nil -} -func MarshalDispersalRequest(request *batcher.DispersalRequest) (map[string]types.AttributeValue, error) { - fields, err := attributevalue.MarshalMap(ToDynamoDispersalRequest(*request)) + operatorID, err := core.OperatorIDFromHex(dispersal.OperatorID) if err != nil { return nil, err } - fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestSK + fmt.Sprintf("%d#%s", request.MinibatchIndex, request.OperatorID.Hex())} - fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", request.RequestedAt.UTC().Unix())} - return fields, nil + + return &operatorID, nil } -func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]types.AttributeValue, error) { - fields, err := attributevalue.MarshalMap(ToDynamoDispersalResponse(*response)) +func UnmarshalBatchRecord(item commondynamodb.Item) (*batcher.BatchRecord, error) { + batch := batcher.BatchRecord{} + err := attributevalue.UnmarshalMap(item, &batch) if err != nil { return nil, err } - fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseSK + fmt.Sprintf("%d#%s", response.MinibatchIndex, response.OperatorID.Hex())} - fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())} - fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.DispersalRequest.RequestedAt.UTC().Unix())} - return fields, nil -} -func UnmarshalBatchRecord(item commondynamodb.Item) (*batcher.BatchRecord, error) { - dbr := DynamoBatchRecord{} - err := attributevalue.UnmarshalMap(item, &dbr) + batchID, err := UnmarshalBatchID(item) if err != nil { return nil, err } + batch.ID = *batchID - batch, err := FromDynamoBatchRecord(dbr) + batchStatus, err := UnmarshalBatchStatus(item) if err != nil { return nil, err } + batch.Status = *batchStatus batch.CreatedAt = batch.CreatedAt.UTC() return &batch, nil } func UnmarshalMinibatchRecord(item commondynamodb.Item) (*batcher.MinibatchRecord, error) { - dbr := DynamoMinibatchRecord{} - err := attributevalue.UnmarshalMap(item, &dbr) + minibatch := batcher.MinibatchRecord{} + err := attributevalue.UnmarshalMap(item, &minibatch) if err != nil { return nil, err } - minibatch, err := FromDynamoMinibatchRecord(dbr) + batchID, err := UnmarshalBatchID(item) if err != nil { return nil, err } + minibatch.BatchID = *batchID + return &minibatch, nil } func UnmarshalDispersalRequest(item commondynamodb.Item) (*batcher.DispersalRequest, error) { - ddr := DynamoDispersalRequest{} - err := attributevalue.UnmarshalMap(item, &ddr) + request := batcher.DispersalRequest{} + err := attributevalue.UnmarshalMap(item, &request) if err != nil { return nil, fmt.Errorf("failed to unmarshal dispersal request from DynamoDB: %v", err) } - request, err := FromDynamoDispersalRequest(ddr) + batchID, err := UnmarshalBatchID(item) + if err != nil { + return nil, err + } + request.BatchID = *batchID + + operatorID, err := UnmarshalOperatorID(item) if err != nil { return nil, err } + request.OperatorID = *operatorID request.RequestedAt = request.RequestedAt.UTC() return &request, nil } func UnmarshalDispersalResponse(item commondynamodb.Item) (*batcher.DispersalResponse, error) { - ddr := DynamoDispersalResponse{} - err := attributevalue.UnmarshalMap(item, &ddr) + response := batcher.DispersalResponse{} + err := attributevalue.UnmarshalMap(item, &response) if err != nil { return nil, err } - response, err := FromDynamoDispersalResponse(ddr) + batchID, err := UnmarshalBatchID(item) if err != nil { return nil, err } + response.BatchID = *batchID + + operatorID, err := UnmarshalOperatorID(item) + if err != nil { + return nil, err + } + response.OperatorID = *operatorID + response.RespondedAt = response.RespondedAt.UTC() response.DispersalRequest.RequestedAt = response.DispersalRequest.RequestedAt.UTC() return &response, nil @@ -360,8 +296,8 @@ func (m *MinibatchStore) PutBatch(ctx context.Context, batch *batcher.BatchRecor if err != nil { return err } - - return m.dynamoDBClient.PutItem(ctx, m.tableName, item) + constraint := "attribute_not_exists(BatchID) AND attribute_not_exists(SK)" + return m.dynamoDBClient.PutItemWithCondition(ctx, m.tableName, item, constraint) } func (m *MinibatchStore) PutMinibatch(ctx context.Context, minibatch *batcher.MinibatchRecord) error { @@ -397,7 +333,7 @@ func (m *MinibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batc Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: batchSK + batchID.String(), + Value: batchSKPrefix + batchID.String(), }, }) if err != nil { @@ -481,7 +417,7 @@ func (m *MinibatchStore) UpdateBatchStatus(ctx context.Context, batchID uuid.UUI } _, err := m.dynamoDBClient.UpdateItem(ctx, m.tableName, map[string]types.AttributeValue{ "BatchID": &types.AttributeValueMemberS{Value: batchID.String()}, - "SK": &types.AttributeValueMemberS{Value: batchSK + batchID.String()}, + "SK": &types.AttributeValueMemberS{Value: batchSKPrefix + batchID.String()}, }, commondynamodb.Item{ "BatchStatus": &types.AttributeValueMemberN{ Value: strconv.Itoa(int(status)), @@ -501,7 +437,7 @@ func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, mi Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: minibatchSK + fmt.Sprintf("%d", minibatchIndex), + Value: minibatchSKPrefix + fmt.Sprintf("%d", minibatchIndex), }, }) if err != nil { @@ -527,7 +463,7 @@ func (m *MinibatchStore) GetMinibatches(ctx context.Context, batchID uuid.UUID) Value: batchID.String(), }, ":prefix": &types.AttributeValueMemberS{ - Value: minibatchSK, + Value: minibatchSKPrefix, }, }) if err != nil { @@ -552,7 +488,7 @@ func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.U Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: dispersalRequestSK + fmt.Sprintf("%d#%s", minibatchIndex, opID.Hex()), + Value: dispersalRequestSKPrefix + fmt.Sprintf("%d#%s", minibatchIndex, opID.Hex()), }, }) if err != nil { @@ -578,7 +514,7 @@ func (m *MinibatchStore) GetDispersalRequests(ctx context.Context, batchID uuid. Value: batchID.String(), }, ":prefix": &types.AttributeValueMemberS{ - Value: dispersalRequestSK, + Value: dispersalRequestSKPrefix, }, }) if err != nil { @@ -603,7 +539,7 @@ func (m *MinibatchStore) GetMinibatchDispersalRequests(ctx context.Context, batc Value: batchID.String(), }, ":sk": &types.AttributeValueMemberS{ - Value: dispersalRequestSK + fmt.Sprintf("%s#%d", batchID.String(), minibatchIndex), + Value: dispersalRequestSKPrefix + fmt.Sprintf("%s#%d", batchID.String(), minibatchIndex), }, }) if err != nil { @@ -632,7 +568,7 @@ func (m *MinibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid. Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: dispersalResponseSK + fmt.Sprintf("%d#%s", minibatchIndex, opID.Hex()), + Value: dispersalResponseSKPrefix + fmt.Sprintf("%d#%s", minibatchIndex, opID.Hex()), }, }) if err != nil { @@ -658,7 +594,7 @@ func (m *MinibatchStore) GetDispersalResponses(ctx context.Context, batchID uuid Value: batchID.String(), }, ":prefix": &types.AttributeValueMemberS{ - Value: dispersalResponseSK, + Value: dispersalResponseSKPrefix, }, }) if err != nil { @@ -683,7 +619,7 @@ func (m *MinibatchStore) GetMinibatchDispersalResponses(ctx context.Context, bat Value: batchID.String(), }, ":sk": &types.AttributeValueMemberS{ - Value: dispersalResponseSK + fmt.Sprintf("%s#%d", batchID.String(), minibatchIndex), + Value: dispersalResponseSKPrefix + fmt.Sprintf("%s#%d", batchID.String(), minibatchIndex), }, }) if err != nil { diff --git a/disperser/batcher/batchstore/minibatch_store_test.go b/disperser/batcher/batchstore/minibatch_store_test.go index 21aa5bcd5b..5e3d4c05e7 100644 --- a/disperser/batcher/batchstore/minibatch_store_test.go +++ b/disperser/batcher/batchstore/minibatch_store_test.go @@ -38,6 +38,12 @@ var ( ) func setup(m *testing.M) { + deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false") + fmt.Printf("deployLocalStack: %v\n", deployLocalStack) + if !deployLocalStack { + localStackPort = os.Getenv("LOCALSTACK_PORT") + } + if deployLocalStack { var err error dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localStackPort) @@ -99,17 +105,28 @@ func TestPutBatch(t *testing.T) { } err = minibatchStore.PutBatch(ctx, batch) assert.NoError(t, err) + err = minibatchStore.PutBatch(ctx, batch) + assert.Error(t, err) b, err := minibatchStore.GetBatch(ctx, batch.ID) assert.NoError(t, err) assert.Equal(t, batch, b) err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, batcher.BatchStatusFormed) assert.NoError(t, err) - err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, 4) - assert.Error(t, err) u, err := minibatchStore.GetBatch(ctx, batch.ID) assert.NoError(t, err) - assert.Equal(t, u.Status, batcher.BatchStatusFormed) - assert.Equal(t, batch, b) + assert.Equal(t, batcher.BatchStatusFormed, u.Status) + err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, batcher.BatchStatusAttested) + assert.NoError(t, err) + u, err = minibatchStore.GetBatch(ctx, batch.ID) + assert.NoError(t, err) + assert.Equal(t, batcher.BatchStatusAttested, u.Status) + err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, batcher.BatchStatusFailed) + assert.NoError(t, err) + u, err = minibatchStore.GetBatch(ctx, batch.ID) + assert.NoError(t, err) + assert.Equal(t, batcher.BatchStatusFailed, u.Status) + err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, 4) + assert.Error(t, err) } func TestGetBatchesByStatus(t *testing.T) { @@ -122,7 +139,7 @@ func TestGetBatchesByStatus(t *testing.T) { ID: id1, CreatedAt: ts, ReferenceBlockNumber: 1, - Status: batcher.BatchStatusAttested, + Status: batcher.BatchStatusFormed, HeaderHash: [32]byte{1}, AggregatePubKey: nil, AggregateSignature: nil, @@ -132,7 +149,7 @@ func TestGetBatchesByStatus(t *testing.T) { ID: id2, CreatedAt: ts, ReferenceBlockNumber: 1, - Status: batcher.BatchStatusAttested, + Status: batcher.BatchStatusFormed, HeaderHash: [32]byte{1}, AggregatePubKey: nil, AggregateSignature: nil, @@ -143,7 +160,7 @@ func TestGetBatchesByStatus(t *testing.T) { ID: id3, CreatedAt: ts, ReferenceBlockNumber: 1, - Status: batcher.BatchStatusAttested, + Status: batcher.BatchStatusFormed, HeaderHash: [32]byte{1}, AggregatePubKey: nil, AggregateSignature: nil, @@ -151,24 +168,20 @@ func TestGetBatchesByStatus(t *testing.T) { err = minibatchStore.PutBatch(ctx, batch3) assert.NoError(t, err) - pending, err := minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusAttested) - assert.NoError(t, err) - assert.Equal(t, 3, len(pending)) - - err = minibatchStore.UpdateBatchStatus(ctx, id1, batcher.BatchStatusFormed) + attested, err := minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusAttested) assert.NoError(t, err) + assert.Equal(t, 0, len(attested)) formed, err := minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusFormed) assert.NoError(t, err) - assert.Equal(t, 2, len(formed)) + assert.Equal(t, 3, len(formed)) - pending, err = minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusAttested) + err = minibatchStore.UpdateBatchStatus(ctx, id1, batcher.BatchStatusAttested) assert.NoError(t, err) - assert.Equal(t, 2, len(pending)) - failed, err := minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusFailed) + formed, err = minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusFormed) assert.NoError(t, err) - assert.Equal(t, 0, len(failed)) + assert.Equal(t, 2, len(formed)) } func TestPutMinibatch(t *testing.T) { diff --git a/disperser/batcher/minibatch_store.go b/disperser/batcher/minibatch_store.go index d538c4384c..2e96e5343c 100644 --- a/disperser/batcher/minibatch_store.go +++ b/disperser/batcher/minibatch_store.go @@ -29,17 +29,17 @@ const ( ) type BatchRecord struct { - ID uuid.UUID + ID uuid.UUID `dynamodbav:"-"` CreatedAt time.Time ReferenceBlockNumber uint - Status BatchStatus + Status BatchStatus `dynamodbav:"-"` HeaderHash [32]byte AggregatePubKey *core.G2Point AggregateSignature *core.Signature } type MinibatchRecord struct { - BatchID uuid.UUID + BatchID uuid.UUID `dynamodbav:"-"` MinibatchIndex uint BlobHeaderHashes [][32]byte BatchSize uint64 // in bytes @@ -47,9 +47,9 @@ type MinibatchRecord struct { } type DispersalRequest struct { - BatchID uuid.UUID - MinibatchIndex uint - core.OperatorID + BatchID uuid.UUID `dynamodbav:"-"` + MinibatchIndex uint + core.OperatorID `dynamodbav:"-"` OperatorAddress gcommon.Address Socket string NumBlobs uint From 4551c2a57345f3d004ec91b352f60cdac4e9d9ed Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Thu, 25 Jul 2024 14:27:26 -0700 Subject: [PATCH 8/8] Lint --- .../batcher/batchstore/minibatch_store.go | 23 ------------------- disperser/batcher/minibatch_store.go | 2 +- 2 files changed, 1 insertion(+), 24 deletions(-) diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go index cf69c7b24d..5fe1bf3c79 100644 --- a/disperser/batcher/batchstore/minibatch_store.go +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -110,7 +110,6 @@ func MarshalBatchRecord(batch *batcher.BatchRecord) (map[string]types.AttributeV return nil, err } fields["BatchID"] = &types.AttributeValueMemberS{Value: batch.ID.String()} - fields["BatchStatus"] = &types.AttributeValueMemberN{Value: strconv.Itoa(int(batch.Status))} fields["SK"] = &types.AttributeValueMemberS{Value: batchSKPrefix + batch.ID.String()} fields["CreatedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", batch.CreatedAt.UTC().Unix())} return fields, nil @@ -170,22 +169,6 @@ func UnmarshalBatchID(item commondynamodb.Item) (*uuid.UUID, error) { return &batchID, nil } -func UnmarshalBatchStatus(item commondynamodb.Item) (*batcher.BatchStatus, error) { - type BatchStatus struct { - BatchStatus uint - } - - batch := BatchStatus{} - err := attributevalue.UnmarshalMap(item, &batch) - if err != nil { - return nil, err - } - - status := batcher.BatchStatus(batch.BatchStatus) - - return &status, nil -} - func UnmarshalOperatorID(item commondynamodb.Item) (*core.OperatorID, error) { type OperatorID struct { OperatorID string @@ -218,12 +201,6 @@ func UnmarshalBatchRecord(item commondynamodb.Item) (*batcher.BatchRecord, error } batch.ID = *batchID - batchStatus, err := UnmarshalBatchStatus(item) - if err != nil { - return nil, err - } - batch.Status = *batchStatus - batch.CreatedAt = batch.CreatedAt.UTC() return &batch, nil } diff --git a/disperser/batcher/minibatch_store.go b/disperser/batcher/minibatch_store.go index 2e96e5343c..d028113214 100644 --- a/disperser/batcher/minibatch_store.go +++ b/disperser/batcher/minibatch_store.go @@ -32,7 +32,7 @@ type BatchRecord struct { ID uuid.UUID `dynamodbav:"-"` CreatedAt time.Time ReferenceBlockNumber uint - Status BatchStatus `dynamodbav:"-"` + Status BatchStatus `dynamodbav:"BatchStatus"` HeaderHash [32]byte AggregatePubKey *core.G2Point AggregateSignature *core.Signature