Skip to content

Commit

Permalink
Create example GSI on OperatorID RequestedAt
Browse files Browse the repository at this point in the history
Replace DispersalRequest.OperatorID field with raw byte array def so that dynamo can use unmarshalMap
  • Loading branch information
pschork committed Jul 22, 2024
1 parent 79cc576 commit deba453
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 10 deletions.
40 changes: 36 additions & 4 deletions disperser/batcher/batchstore/minibatch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit
AttributeName: aws.String("SK"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("OperatorID"),
AttributeType: types.ScalarAttributeTypeB,
},
{
AttributeName: aws.String("RequestedAt"),
AttributeType: types.ScalarAttributeTypeN,
},
},
KeySchema: []types.KeySchemaElement{
{
Expand All @@ -61,8 +69,29 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit
KeyType: types.KeyTypeRange,
},
},
TableName: aws.String(tableName),
GlobalSecondaryIndexes: nil,
TableName: aws.String(tableName),
GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{
{
IndexName: aws.String("OperatorID_RequestedAt_Index"),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("OperatorID"),
KeyType: types.KeyTypeHash,
},
{
AttributeName: aws.String("RequestedAt"),
KeyType: types.KeyTypeRange,
},
},
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeAll,
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(readCapacityUnits),
WriteCapacityUnits: aws.Int64(writeCapacityUnits),
},
},
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(readCapacityUnits),
WriteCapacityUnits: aws.Int64(writeCapacityUnits),
Expand Down Expand Up @@ -109,7 +138,8 @@ func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]t
}
fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + response.BatchID.String()}
fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseKey + fmt.Sprintf("%d", response.MinibatchIndex)}
fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RequestedAt.UTC().Unix())}
fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())}
fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.DispersalRequest.RequestedAt.UTC().Unix())}
return fields, nil
}
func UnmarshalBatchRecord(item commondynamodb.Item) (*batcher.BatchRecord, error) {
Expand All @@ -135,8 +165,9 @@ func UnmarshalDispersalRequest(item commondynamodb.Item) (*batcher.DispersalRequ
request := batcher.DispersalRequest{}
err := attributevalue.UnmarshalMap(item, &request)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to unmarshal dispersal request from DynamoDB: %v", err)
}

request.RequestedAt = request.RequestedAt.UTC()
return &request, nil
}
Expand All @@ -148,6 +179,7 @@ func UnmarshalDispersalResponse(item commondynamodb.Item) (*batcher.DispersalRes
return nil, err
}
response.RespondedAt = response.RespondedAt.UTC()
response.DispersalRequest.RequestedAt = response.DispersalRequest.RequestedAt.UTC()
return &response, nil
}

Expand Down
5 changes: 3 additions & 2 deletions disperser/batcher/batchstore/minibatch_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
dockertestResource *dockertest.Resource

deployLocalStack bool
localStackPort = "4566"
localStackPort = "4570"

dynamoClient *dynamodb.Client
minibatchStore *batchstore.MinibatchStore
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestPutDispersalRequest(t *testing.T) {
request := &batcher.DispersalRequest{
BatchID: id,
MinibatchIndex: 0,
OperatorID: core.OperatorID([32]byte{1}),
OperatorID: core.OperatorID([32]byte{123}),
OperatorAddress: gcommon.HexToAddress("0x0"),
NumBlobs: 1,
RequestedAt: ts,
Expand Down Expand Up @@ -165,4 +165,5 @@ func TestPutDispersalResponse(t *testing.T) {
r, err := minibatchStore.GetDispersalResponse(ctx, response.BatchID, response.MinibatchIndex)
assert.NoError(t, err)
assert.Equal(t, response, r)
assert.Error(t, err)
}
2 changes: 1 addition & 1 deletion disperser/batcher/inmem/minibatch_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestPutDispersalRequest(t *testing.T) {
request := &batcher.DispersalRequest{
BatchID: id,
MinibatchIndex: 0,
OperatorID: core.OperatorID([32]byte{1}),
OperatorID: core.OperatorID([32]byte{0}),
OperatorAddress: gcommon.HexToAddress("0x0"),
NumBlobs: 1,
RequestedAt: time.Now().UTC(),
Expand Down
6 changes: 3 additions & 3 deletions disperser/batcher/minibatch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ type MinibatchRecord struct {
}

type DispersalRequest struct {
BatchID uuid.UUID
MinibatchIndex uint
core.OperatorID
BatchID uuid.UUID
MinibatchIndex uint
OperatorID [32]byte
OperatorAddress gcommon.Address
NumBlobs uint
RequestedAt time.Time
Expand Down

0 comments on commit deba453

Please sign in to comment.