diff --git a/README.md b/README.md index 0aab443070..6556661757 100644 --- a/README.md +++ b/README.md @@ -10,11 +10,11 @@ EigenDA is a secure, high-throughput, and decentralized data availability (DA) service built on top of Ethereum using the [EigenLayer](https://github.com/Layr-Labs/eigenlayer-contracts) restaking primitives. -To understand more how EigenDA works and how it transforms the modern landscape of data availability, continue reading [EigenDA introduction](https://www.blog.eigenlayer.xyz/intro-to-eigenda-hyperscale-data-availability-for-rollups/). +To understand more about how EigenDA works and how it transforms the modern landscape of data availability, continue reading [EigenDA introduction](https://www.blog.eigenlayer.xyz/intro-to-eigenda-hyperscale-data-availability-for-rollups/). To dive deep into the technical details, continue reading [EigenDA protocol spec](https://github.com/Layr-Labs/eigenda/blob/master/docs/spec/overview.md). -If you're interested in integrating your rollup with EigenDA, please fill out the [EigenDA questionnaire](https://docs.google.com/forms/d/e/1FAIpQLSez6PG-BL6C6Mc4QY1M--vbV219OGL_0Euv2zhJ1HmcUiU7cw/viewform). +If you're interested in integrating your rollup with EigenDA, please fill out the [EigenDA Partner Registration](https://docs.google.com/forms/d/e/1FAIpQLSdXvfxgRfIHWYu90FqN-2yyhgrYm9oExr0jSy7ERzbMUimJew/viewform). ## API Documentation @@ -38,4 +38,4 @@ We welcome all contributions! There are many ways to contribute to the project, - [Open an Issue](https://github.com/Layr-Labs/eigenda/issues/new/choose) - [EigenLayer/EigenDA forum](https://forum.eigenlayer.xyz/c/eigenda/9) - [Email](mailto:eigenda-support@eigenlabs.org) -- [Follow us on Twitter](https://twitter.com/eigenlayer) +- [Follow us on Twitter](https://twitter.com/eigen_da) diff --git a/api/clients/node_client.go b/api/clients/node_client.go index daca5b6e53..faac36944b 100644 --- a/api/clients/node_client.go +++ b/api/clients/node_client.go @@ -2,6 +2,7 @@ package clients import ( "context" + "errors" "time" "github.com/Layr-Labs/eigenda/api/grpc/node" @@ -120,7 +121,23 @@ func (c client) GetChunks( chunks := make([]*encoding.Frame, len(reply.GetChunks())) for i, data := range reply.GetChunks() { - chunk, err := new(encoding.Frame).Deserialize(data) + var chunk *encoding.Frame + switch reply.GetEncoding() { + case node.ChunkEncoding_GNARK: + chunk, err = new(encoding.Frame).DeserializeGnark(data) + case node.ChunkEncoding_GOB: + chunk, err = new(encoding.Frame).Deserialize(data) + case node.ChunkEncoding_UNKNOWN: + // For backward compatibility, we fallback the UNKNOWN to GNARK + chunk, err = new(encoding.Frame).DeserializeGnark(data) + if err != nil { + chunksChan <- RetrievedChunks{ + OperatorID: opID, + Err: errors.New("UNKNOWN chunk encoding format"), + Chunks: nil, + } + } + } if err != nil { chunksChan <- RetrievedChunks{ OperatorID: opID, diff --git a/api/docs/disperser.md b/api/docs/disperser.md index d472f0b5c9..b5c85b9f84 100644 --- a/api/docs/disperser.md +++ b/api/docs/disperser.md @@ -154,7 +154,7 @@ replay attacks in the event that a signature is leaked. | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | | commitment | [common.G1Commitment](#common-G1Commitment) | | KZG commitment of the blob. | -| data_length | [uint32](#uint32) | | The length of the blob in symbols (each symbol is 31 bytes). | +| data_length | [uint32](#uint32) | | The length of the blob in symbols (each symbol is 32 bytes). | | blob_quorum_params | [BlobQuorumParam](#disperser-BlobQuorumParam) | repeated | The params of the quorums that this blob participates in. | @@ -270,7 +270,7 @@ BlobStatusRequest is used to query the status of a blob. | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | -| data | [bytes](#bytes) | | The data to be dispersed. The size of data must be <= 2MiB. | +| data | [bytes](#bytes) | | The data to be dispersed. The size of data must be <= 2MiB. Every 32 bytes of data chunk is interpreted as an integer in big endian format where the lower address has more significant bits. The integer must stay in the valid range to be interpreted as a field element on the bn254 curve. The valid range is 0 <= x < 21888242871839275222246405745257275088548364400416034343698204186575808495617 containing slightly less than 254 bits and more than 253 bits. If any one of the 32 bytes chunk is outside the range, the whole request is deemed as invalid, and rejected. | | custom_quorum_numbers | [uint32](#uint32) | repeated | The quorums to which the blob will be sent, in addition to the required quorums which are configured on the EigenDA smart contract. If required quorums are included here, an error will be returned. The disperser will ensure that the encoded blobs for each quorum are all processed within the same batch. | | account_id | [string](#string) | | The account ID of the client. This should be a hex-encoded string of the ECSDA public key corresponding to the key used by the client to sign the BlobAuthHeader. | @@ -315,7 +315,17 @@ RetrieveBlobRequest contains parameters to retrieve the blob. ### BlobStatus - +BlobStatus represents the status of a blob. +The status of a blob is updated as the blob is processed by the disperser. +The status of a blob can be queried by the client using the GetBlobStatus API. +Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state: +- PROCESSING +- DISPERSING +- CONFIRMED +Terminal states are states that will not be updated to a different state: +- FAILED +- FINALIZED +- INSUFFICIENT_SIGNATURES | Name | Number | Description | | ---- | ------ | ----------- | @@ -325,6 +335,7 @@ RetrieveBlobRequest contains parameters to retrieve the blob. | FAILED | 3 | FAILED means that the blob has failed permanently (for reasons other than insufficient signatures, which is a separate state) | | FINALIZED | 4 | FINALIZED means that the block containing the blob's confirmation transaction has been finalized on Ethereum | | INSUFFICIENT_SIGNATURES | 5 | INSUFFICIENT_SIGNATURES means that the confirmation threshold for the blob was not met for at least one quorum. | +| DISPERSING | 6 | DISPERSING means that the blob is currently being dispersed to DA Nodes and being confirmed onchain | @@ -339,7 +350,7 @@ Disperser defines the public APIs for dispersing blobs. | Method Name | Request Type | Response Type | Description | | ----------- | ------------ | ------------- | ------------| -| DisperseBlob | [DisperseBlobRequest](#disperser-DisperseBlobRequest) | [DisperseBlobReply](#disperser-DisperseBlobReply) | This API accepts blob to disperse from clients. This executes the dispersal async, i.e. it returns once the request is accepted. The client could use GetBlobStatus() API to poll the processing status of the blob. | +| DisperseBlob | [DisperseBlobRequest](#disperser-DisperseBlobRequest) | [DisperseBlobReply](#disperser-DisperseBlobReply) | This API accepts blob to disperse from clients. This executes the dispersal async, i.e. it returns once the request is accepted. The client could use GetBlobStatus() API to poll the the processing status of the blob. | | DisperseBlobAuthenticated | [AuthenticatedRequest](#disperser-AuthenticatedRequest) stream | [AuthenticatedReply](#disperser-AuthenticatedReply) stream | DisperseBlobAuthenticated is similar to DisperseBlob, except that it requires the client to authenticate itself via the AuthenticationData message. The protoco is as follows: 1. The client sends a DisperseBlobAuthenticated request with the DisperseBlobRequest message 2. The Disperser sends back a BlobAuthHeader message containing information for the client to verify and sign. 3. The client verifies the BlobAuthHeader and sends back the signed BlobAuthHeader in an AuthenticationData message. 4. The Disperser verifies the signature and returns a DisperseBlobReply message. | | GetBlobStatus | [BlobStatusRequest](#disperser-BlobStatusRequest) | [BlobStatusReply](#disperser-BlobStatusReply) | This API is meant to be polled for the blob status. | | RetrieveBlob | [RetrieveBlobRequest](#disperser-RetrieveBlobRequest) | [RetrieveBlobReply](#disperser-RetrieveBlobReply) | This retrieves the requested blob from the Disperser's backend. This is a more efficient way to retrieve blobs than directly retrieving from the DA Nodes (see detail about this approach in api/proto/retriever/retriever.proto). The blob should have been initially dispersed via this Disperser service for this API to work. | diff --git a/api/docs/node.md b/api/docs/node.md index fe3d9149da..d36a0b5f8d 100644 --- a/api/docs/node.md +++ b/api/docs/node.md @@ -4,6 +4,8 @@ ## Table of Contents - [node/node.proto](#node_node-proto) + - [AttestBatchReply](#node-AttestBatchReply) + - [AttestBatchRequest](#node-AttestBatchRequest) - [BatchHeader](#node-BatchHeader) - [Blob](#node-Blob) - [BlobHeader](#node-BlobHeader) @@ -13,11 +15,17 @@ - [GetBlobHeaderReply](#node-GetBlobHeaderReply) - [GetBlobHeaderRequest](#node-GetBlobHeaderRequest) - [MerkleProof](#node-MerkleProof) + - [NodeInfoReply](#node-NodeInfoReply) + - [NodeInfoRequest](#node-NodeInfoRequest) - [RetrieveChunksReply](#node-RetrieveChunksReply) - [RetrieveChunksRequest](#node-RetrieveChunksRequest) + - [StoreBlobsReply](#node-StoreBlobsReply) + - [StoreBlobsRequest](#node-StoreBlobsRequest) - [StoreChunksReply](#node-StoreChunksReply) - [StoreChunksRequest](#node-StoreChunksRequest) + - [ChunkEncoding](#node-ChunkEncoding) + - [Dispersal](#node-Dispersal) - [Retrieval](#node-Retrieval) @@ -35,6 +43,37 @@ + + +### AttestBatchReply + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| signature | [bytes](#bytes) | | | + + + + + + + + +### AttestBatchRequest + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| batch_header | [BatchHeader](#node-BatchHeader) | | header of the batch | +| blob_header_hashes | [bytes](#bytes) | repeated | the header hashes of all blobs in the batch | + + + + + + ### BatchHeader @@ -86,6 +125,7 @@ single operator node. | length | [uint32](#uint32) | | The length of the original blob in number of symbols (in the field where the polynomial is defined). | | quorum_headers | [BlobQuorumInfo](#node-BlobQuorumInfo) | repeated | The params of the quorums that this blob participates in. | | account_id | [string](#string) | | The ID of the user who is dispersing this blob to EigenDA. | +| reference_block_number | [uint32](#uint32) | | The reference block number whose state is used to encode the blob | @@ -122,6 +162,7 @@ operator and a single quorum. | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | | chunks | [bytes](#bytes) | repeated | Each chunk corresponds to a collection of points on the polynomial. Each chunk has same number of points. | +| bundle | [bytes](#bytes) | | All chunks of the bundle encoded in a byte array. | @@ -195,6 +236,35 @@ See RetrieveChunksRequest for documentation of each parameter of GetBlobHeaderRe + + +### NodeInfoReply +Node info reply + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| semver | [string](#string) | | | +| arch | [string](#string) | | | +| os | [string](#string) | | | +| num_cpu | [uint32](#uint32) | | | +| mem_bytes | [uint64](#uint64) | | | + + + + + + + + +### NodeInfoRequest +Node info request + + + + + + ### RetrieveChunksReply @@ -204,6 +274,7 @@ See RetrieveChunksRequest for documentation of each parameter of GetBlobHeaderRe | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | | chunks | [bytes](#bytes) | repeated | All chunks the Node is storing for the requested blob per RetrieveChunksRequest. | +| encoding | [ChunkEncoding](#node-ChunkEncoding) | | How the above chunks encoded. | @@ -227,6 +298,37 @@ See RetrieveChunksRequest for documentation of each parameter of GetBlobHeaderRe + + +### StoreBlobsReply + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| signatures | [google.protobuf.BytesValue](#google-protobuf-BytesValue) | repeated | The operator's BLS sgnature signed on the blob header hashes. The ordering of the signatures must match the ordering of the blobs sent in the request, with empty signatures in the places for discarded blobs. | + + + + + + + + +### StoreBlobsRequest + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| blobs | [Blob](#node-Blob) | repeated | Blobs to store | +| reference_block_number | [uint32](#uint32) | | The reference block number whose state is used to encode the blobs | + + + + + + ### StoreChunksReply @@ -259,6 +361,20 @@ See RetrieveChunksRequest for documentation of each parameter of GetBlobHeaderRe + + + +### ChunkEncoding +This describes how the chunks returned in RetrieveChunksReply are encoded. +Used to facilitate the decoding of chunks. + +| Name | Number | Description | +| ---- | ------ | ----------- | +| UNKNOWN | 0 | | +| GNARK | 1 | | +| GOB | 2 | | + + @@ -272,6 +388,9 @@ See RetrieveChunksRequest for documentation of each parameter of GetBlobHeaderRe | Method Name | Request Type | Response Type | Description | | ----------- | ------------ | ------------- | ------------| | StoreChunks | [StoreChunksRequest](#node-StoreChunksRequest) | [StoreChunksReply](#node-StoreChunksReply) | StoreChunks validates that the chunks match what the Node is supposed to receive ( different Nodes are responsible for different chunks, as EigenDA is horizontally sharded) and is correctly coded (e.g. each chunk must be a valid KZG multiproof) according to the EigenDA protocol. It also stores the chunks along with metadata for the protocol-defined length of custody. It will return a signature at the end to attest to the data in this request it has processed. | +| StoreBlobs | [StoreBlobsRequest](#node-StoreBlobsRequest) | [StoreBlobsReply](#node-StoreBlobsReply) | StoreBlobs is simiar to StoreChunks, but it stores the blobs using a different storage schema so that the stored blobs can later be aggregated by AttestBatch method to a bigger batch. StoreBlobs + AttestBatch will eventually replace and deprecate StoreChunks method. | +| AttestBatch | [AttestBatchRequest](#node-AttestBatchRequest) | [AttestBatchReply](#node-AttestBatchReply) | AttestBatch is used to aggregate the batches stored by StoreBlobs method to a bigger batch. It will return a signature at the end to attest to the aggregated batch. | +| NodeInfo | [NodeInfoRequest](#node-NodeInfoRequest) | [NodeInfoReply](#node-NodeInfoReply) | Retrieve node info metadata | @@ -282,7 +401,8 @@ See RetrieveChunksRequest for documentation of each parameter of GetBlobHeaderRe | Method Name | Request Type | Response Type | Description | | ----------- | ------------ | ------------- | ------------| | RetrieveChunks | [RetrieveChunksRequest](#node-RetrieveChunksRequest) | [RetrieveChunksReply](#node-RetrieveChunksReply) | RetrieveChunks retrieves the chunks for a blob custodied at the Node. | -| GetBlobHeader | [GetBlobHeaderRequest](#node-GetBlobHeaderRequest) | [GetBlobHeaderReply](#node-GetBlobHeaderReply) | Similar to RetrieveChunks, this just returns the header of the blob. | +| GetBlobHeader | [GetBlobHeaderRequest](#node-GetBlobHeaderRequest) | [GetBlobHeaderReply](#node-GetBlobHeaderReply) | GetBlobHeader is similar to RetrieveChunks, this just returns the header of the blob. | +| NodeInfo | [NodeInfoRequest](#node-NodeInfoRequest) | [NodeInfoReply](#node-NodeInfoReply) | Retrieve node info metadata | diff --git a/api/grpc/mock/node_disperser_client.go b/api/grpc/mock/node_disperser_client.go new file mode 100644 index 0000000000..a3d296dd21 --- /dev/null +++ b/api/grpc/mock/node_disperser_client.go @@ -0,0 +1,39 @@ +package mock + +import ( + "context" + + "github.com/Layr-Labs/eigenda/api/grpc/node" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc" +) + +type MockNodeDispersalClient struct { + mock.Mock +} + +var _ node.DispersalClient = (*MockNodeDispersalClient)(nil) + +func NewMockDispersalClient() *MockNodeDispersalClient { + return &MockNodeDispersalClient{} +} + +func (m *MockNodeDispersalClient) StoreChunks(ctx context.Context, in *node.StoreChunksRequest, opts ...grpc.CallOption) (*node.StoreChunksReply, error) { + args := m.Called() + return args.Get(0).(*node.StoreChunksReply), args.Error(1) +} + +func (m *MockNodeDispersalClient) StoreBlobs(ctx context.Context, in *node.StoreBlobsRequest, opts ...grpc.CallOption) (*node.StoreBlobsReply, error) { + args := m.Called() + return args.Get(0).(*node.StoreBlobsReply), args.Error(1) +} + +func (m *MockNodeDispersalClient) AttestBatch(ctx context.Context, in *node.AttestBatchRequest, opts ...grpc.CallOption) (*node.AttestBatchReply, error) { + args := m.Called() + return args.Get(0).(*node.AttestBatchReply), args.Error(1) +} + +func (m *MockNodeDispersalClient) NodeInfo(ctx context.Context, in *node.NodeInfoRequest, opts ...grpc.CallOption) (*node.NodeInfoReply, error) { + args := m.Called() + return args.Get(0).(*node.NodeInfoReply), args.Error(1) +} diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index 04cc7d75ab..2e309a86bd 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -105,6 +105,18 @@ func (c *Client) PutItem(ctx context.Context, tableName string, item Item) (err return nil } +func (c *Client) PutItemWithCondition(ctx context.Context, tableName string, item Item, condition string) (err error) { + _, err = c.dynamoClient.PutItem(ctx, &dynamodb.PutItemInput{ + TableName: aws.String(tableName), Item: item, + ConditionExpression: aws.String(condition), + }) + if err != nil { + return err + } + + return nil +} + // PutItems puts items in batches of 25 items (which is a limit DynamoDB imposes) // It returns the items that failed to be put. func (c *Client) PutItems(ctx context.Context, tableName string, items []Item) ([]Item, error) { @@ -166,6 +178,20 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str return response.Items, nil } +// Query returns all items in the primary index that match the given expression +func (c *Client) Query(ctx context.Context, tableName string, keyCondition string, expAttributeValues ExpresseionValues) ([]Item, error) { + response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{ + TableName: aws.String(tableName), + KeyConditionExpression: aws.String(keyCondition), + ExpressionAttributeValues: expAttributeValues, + }) + if err != nil { + return nil, err + } + + return response.Items, nil +} + // QueryIndexCount returns the count of the items in the index that match the given key func (c *Client) QueryIndexCount(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues) (int32, error) { response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{ diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 753b7c55a8..747649bdcc 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -964,13 +964,8 @@ func (s *DispersalServer) validateRequestAndGetBlob(ctx context.Context, req *pb if _, ok := seenQuorums[quorumID]; ok { return nil, fmt.Errorf("custom_quorum_numbers should not include the required quorums %v, but required quorum %d was found", quorumConfig.RequiredQuorums, quorumID) } - if s.serverConfig.EnableDualQuorums { - seenQuorums[quorumID] = struct{}{} - } else if quorumID == 0 { - // If dual quorum staking is not enabled, we only consider the quorum 0 as the - // required quorum. - seenQuorums[quorumID] = struct{}{} - } + + seenQuorums[quorumID] = struct{}{} } if len(seenQuorums) == 0 { diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go new file mode 100644 index 0000000000..5fe1bf3c79 --- /dev/null +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -0,0 +1,620 @@ +package batchstore + +import ( + "context" + "fmt" + "strconv" + "time" + + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser/batcher" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/google/uuid" +) + +const ( + batchStatusIndexName = "BatchStatusIndex" + batchSKPrefix = "BATCH#" + minibatchSKPrefix = "MINIBATCH#" + dispersalRequestSKPrefix = "DISPERSAL_REQUEST#" + dispersalResponseSKPrefix = "DISPERSAL_RESPONSE#" +) + +type MinibatchStore struct { + dynamoDBClient *commondynamodb.Client + tableName string + logger logging.Logger + ttl time.Duration +} + +var _ batcher.MinibatchStore = (*MinibatchStore)(nil) + +func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *MinibatchStore { + logger.Debugf("creating minibatch store with table %s with TTL: %s", tableName, ttl) + return &MinibatchStore{ + dynamoDBClient: dynamoDBClient, + logger: logger.With("component", "MinibatchStore"), + tableName: tableName, + ttl: ttl, + } +} + +func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { + return &dynamodb.CreateTableInput{ + AttributeDefinitions: []types.AttributeDefinition{ + { + AttributeName: aws.String("BatchID"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("SK"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("BatchStatus"), + AttributeType: types.ScalarAttributeTypeN, + }, + { + AttributeName: aws.String("CreatedAt"), + AttributeType: types.ScalarAttributeTypeN, + }, + }, + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("BatchID"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("SK"), + KeyType: types.KeyTypeRange, + }, + }, + TableName: aws.String(tableName), + GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ + { + IndexName: aws.String(batchStatusIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("BatchStatus"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("CreatedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + } +} + +func MarshalBatchRecord(batch *batcher.BatchRecord) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*batch) + if err != nil { + return nil, err + } + fields["BatchID"] = &types.AttributeValueMemberS{Value: batch.ID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: batchSKPrefix + batch.ID.String()} + fields["CreatedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", batch.CreatedAt.UTC().Unix())} + return fields, nil +} + +func MarshalMinibatchRecord(minibatch *batcher.MinibatchRecord) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*minibatch) + if err != nil { + return nil, err + } + fields["BatchID"] = &types.AttributeValueMemberS{Value: minibatch.BatchID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: minibatchSKPrefix + fmt.Sprintf("%d", minibatch.MinibatchIndex)} + return fields, nil +} + +func MarshalDispersalRequest(request *batcher.DispersalRequest) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*request) + if err != nil { + return nil, err + } + fields["BatchID"] = &types.AttributeValueMemberS{Value: request.BatchID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestSKPrefix + fmt.Sprintf("%d#%s", request.MinibatchIndex, request.OperatorID.Hex())} + fields["OperatorID"] = &types.AttributeValueMemberS{Value: request.OperatorID.Hex()} + fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", request.RequestedAt.UTC().Unix())} + return fields, nil +} + +func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*response) + if err != nil { + return nil, err + } + fields["BatchID"] = &types.AttributeValueMemberS{Value: response.BatchID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseSKPrefix + fmt.Sprintf("%d#%s", response.MinibatchIndex, response.OperatorID.Hex())} + fields["OperatorID"] = &types.AttributeValueMemberS{Value: response.OperatorID.Hex()} + fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())} + fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.DispersalRequest.RequestedAt.UTC().Unix())} + return fields, nil +} + +func UnmarshalBatchID(item commondynamodb.Item) (*uuid.UUID, error) { + type BatchID struct { + BatchID string + } + + batch := BatchID{} + err := attributevalue.UnmarshalMap(item, &batch) + if err != nil { + return nil, err + } + + batchID, err := uuid.Parse(batch.BatchID) + if err != nil { + return nil, err + } + + return &batchID, nil +} + +func UnmarshalOperatorID(item commondynamodb.Item) (*core.OperatorID, error) { + type OperatorID struct { + OperatorID string + } + + dispersal := OperatorID{} + err := attributevalue.UnmarshalMap(item, &dispersal) + if err != nil { + return nil, err + } + + operatorID, err := core.OperatorIDFromHex(dispersal.OperatorID) + if err != nil { + return nil, err + } + + return &operatorID, nil +} + +func UnmarshalBatchRecord(item commondynamodb.Item) (*batcher.BatchRecord, error) { + batch := batcher.BatchRecord{} + err := attributevalue.UnmarshalMap(item, &batch) + if err != nil { + return nil, err + } + + batchID, err := UnmarshalBatchID(item) + if err != nil { + return nil, err + } + batch.ID = *batchID + + batch.CreatedAt = batch.CreatedAt.UTC() + return &batch, nil +} + +func UnmarshalMinibatchRecord(item commondynamodb.Item) (*batcher.MinibatchRecord, error) { + minibatch := batcher.MinibatchRecord{} + err := attributevalue.UnmarshalMap(item, &minibatch) + if err != nil { + return nil, err + } + + batchID, err := UnmarshalBatchID(item) + if err != nil { + return nil, err + } + minibatch.BatchID = *batchID + + return &minibatch, nil +} + +func UnmarshalDispersalRequest(item commondynamodb.Item) (*batcher.DispersalRequest, error) { + request := batcher.DispersalRequest{} + err := attributevalue.UnmarshalMap(item, &request) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal dispersal request from DynamoDB: %v", err) + } + + batchID, err := UnmarshalBatchID(item) + if err != nil { + return nil, err + } + request.BatchID = *batchID + + operatorID, err := UnmarshalOperatorID(item) + if err != nil { + return nil, err + } + request.OperatorID = *operatorID + + request.RequestedAt = request.RequestedAt.UTC() + return &request, nil +} + +func UnmarshalDispersalResponse(item commondynamodb.Item) (*batcher.DispersalResponse, error) { + response := batcher.DispersalResponse{} + err := attributevalue.UnmarshalMap(item, &response) + if err != nil { + return nil, err + } + + batchID, err := UnmarshalBatchID(item) + if err != nil { + return nil, err + } + response.BatchID = *batchID + + operatorID, err := UnmarshalOperatorID(item) + if err != nil { + return nil, err + } + response.OperatorID = *operatorID + + response.RespondedAt = response.RespondedAt.UTC() + response.DispersalRequest.RequestedAt = response.DispersalRequest.RequestedAt.UTC() + return &response, nil +} + +func (m *MinibatchStore) PutBatch(ctx context.Context, batch *batcher.BatchRecord) error { + item, err := MarshalBatchRecord(batch) + if err != nil { + return err + } + constraint := "attribute_not_exists(BatchID) AND attribute_not_exists(SK)" + return m.dynamoDBClient.PutItemWithCondition(ctx, m.tableName, item, constraint) +} + +func (m *MinibatchStore) PutMinibatch(ctx context.Context, minibatch *batcher.MinibatchRecord) error { + item, err := MarshalMinibatchRecord(minibatch) + if err != nil { + return err + } + + return m.dynamoDBClient.PutItem(ctx, m.tableName, item) +} + +func (m *MinibatchStore) PutDispersalRequest(ctx context.Context, request *batcher.DispersalRequest) error { + item, err := MarshalDispersalRequest(request) + if err != nil { + return err + } + + return m.dynamoDBClient.PutItem(ctx, m.tableName, item) +} + +func (m *MinibatchStore) PutDispersalResponse(ctx context.Context, response *batcher.DispersalResponse) error { + item, err := MarshalDispersalResponse(response) + if err != nil { + return err + } + + return m.dynamoDBClient.PutItem(ctx, m.tableName, item) +} + +func (m *MinibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batcher.BatchRecord, error) { + item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ + "BatchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + "SK": &types.AttributeValueMemberS{ + Value: batchSKPrefix + batchID.String(), + }, + }) + if err != nil { + m.logger.Errorf("failed to get batch from DynamoDB: %v", err) + return nil, err + } + + if item == nil { + return nil, nil + } + + batch, err := UnmarshalBatchRecord(item) + if err != nil { + m.logger.Errorf("failed to unmarshal batch record from DynamoDB: %v", err) + return nil, err + } + return batch, nil +} + +func (m *MinibatchStore) BatchDispersed(ctx context.Context, batchID uuid.UUID) (bool, error) { + dispersalRequests, err := m.GetDispersalRequests(ctx, batchID) + if err != nil { + return false, fmt.Errorf("failed to get dispersal requests for batch %s - %v", batchID.String(), err) + + } + dispersalResponses, err := m.GetDispersalResponses(ctx, batchID) + if err != nil { + return false, fmt.Errorf("failed to get dispersal responses for batch %s - %v", batchID.String(), err) + } + if len(dispersalRequests) != len(dispersalResponses) { + m.logger.Info("number of minibatch dispersal requests does not match responses", "batchID", batchID, "numRequests", len(dispersalRequests), "numResponses", len(dispersalResponses)) + return false, nil + } + if len(dispersalRequests) == 0 || len(dispersalResponses) == 0 { + m.logger.Info("no dispersal requests or responses found", "batchID", batchID) + return false, nil + } + return true, nil +} + +func (m *MinibatchStore) GetBatchesByStatus(ctx context.Context, status batcher.BatchStatus) ([]*batcher.BatchRecord, error) { + items, err := m.dynamoDBClient.QueryIndex(ctx, m.tableName, batchStatusIndexName, "BatchStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }}) + if err != nil { + return nil, err + } + + batches := make([]*batcher.BatchRecord, len(items)) + for i, item := range items { + batches[i], err = UnmarshalBatchRecord(item) + if err != nil { + m.logger.Errorf("failed to unmarshal batch record at index %d: %v", i, err) + return nil, err + } + } + + return batches, nil +} + +func (m *MinibatchStore) GetLatestFormedBatch(ctx context.Context) (batch *batcher.BatchRecord, minibatches []*batcher.MinibatchRecord, err error) { + formed, err := m.GetBatchesByStatus(ctx, batcher.BatchStatusFormed) + if err != nil { + return nil, nil, err + } + if len(formed) == 0 { + return nil, nil, nil + } + batch = formed[len(formed)-1] + minibatches, err = m.GetMinibatches(ctx, batch.ID) + if err != nil { + return nil, nil, err + } + return batch, minibatches, nil +} + +func (m *MinibatchStore) UpdateBatchStatus(ctx context.Context, batchID uuid.UUID, status batcher.BatchStatus) error { + if status < batcher.BatchStatusFormed || status > batcher.BatchStatusFailed { + return fmt.Errorf("invalid batch status %v", status) + } + _, err := m.dynamoDBClient.UpdateItem(ctx, m.tableName, map[string]types.AttributeValue{ + "BatchID": &types.AttributeValueMemberS{Value: batchID.String()}, + "SK": &types.AttributeValueMemberS{Value: batchSKPrefix + batchID.String()}, + }, commondynamodb.Item{ + "BatchStatus": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + }) + + if err != nil { + return fmt.Errorf("failed to update batch status: %v", err) + } + + return nil +} + +func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) { + item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ + "BatchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + "SK": &types.AttributeValueMemberS{ + Value: minibatchSKPrefix + fmt.Sprintf("%d", minibatchIndex), + }, + }) + if err != nil { + m.logger.Errorf("failed to get minibatch from DynamoDB: %v", err) + return nil, err + } + + if item == nil { + return nil, nil + } + + minibatch, err := UnmarshalMinibatchRecord(item) + if err != nil { + m.logger.Errorf("failed to unmarshal minibatch record from DynamoDB: %v", err) + return nil, err + } + return minibatch, nil +} + +func (m *MinibatchStore) GetMinibatches(ctx context.Context, batchID uuid.UUID) ([]*batcher.MinibatchRecord, error) { + items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND begins_with(SK, :prefix)", commondynamodb.ExpresseionValues{ + ":batchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + ":prefix": &types.AttributeValueMemberS{ + Value: minibatchSKPrefix, + }, + }) + if err != nil { + return nil, err + } + + minibatches := make([]*batcher.MinibatchRecord, len(items)) + for i, item := range items { + minibatches[i], err = UnmarshalMinibatchRecord(item) + if err != nil { + m.logger.Errorf("failed to unmarshal minibatch record at index %d: %v", i, err) + return nil, err + } + } + + return minibatches, nil +} + +func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*batcher.DispersalRequest, error) { + item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ + "BatchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + "SK": &types.AttributeValueMemberS{ + Value: dispersalRequestSKPrefix + fmt.Sprintf("%d#%s", minibatchIndex, opID.Hex()), + }, + }) + if err != nil { + m.logger.Errorf("failed to get dispersal request from DynamoDB: %v", err) + return nil, err + } + + if item == nil { + return nil, nil + } + + request, err := UnmarshalDispersalRequest(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal request from DynamoDB: %v", err) + return nil, err + } + return request, nil +} + +func (m *MinibatchStore) GetDispersalRequests(ctx context.Context, batchID uuid.UUID) ([]*batcher.DispersalRequest, error) { + items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND begins_with(SK, :prefix)", commondynamodb.ExpresseionValues{ + ":batchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + ":prefix": &types.AttributeValueMemberS{ + Value: dispersalRequestSKPrefix, + }, + }) + if err != nil { + return nil, err + } + + requests := make([]*batcher.DispersalRequest, len(items)) + for i, item := range items { + requests[i], err = UnmarshalDispersalRequest(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal requests at index %d: %v", i, err) + return nil, err + } + } + + return requests, nil +} + +func (m *MinibatchStore) GetMinibatchDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalRequest, error) { + items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND SK = :sk", commondynamodb.ExpresseionValues{ + ":batchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + ":sk": &types.AttributeValueMemberS{ + Value: dispersalRequestSKPrefix + fmt.Sprintf("%s#%d", batchID.String(), minibatchIndex), + }, + }) + if err != nil { + return nil, err + } + + if len(items) == 0 { + return nil, fmt.Errorf("no dispersal requests found for BatchID %s MinibatchIndex %d", batchID, minibatchIndex) + } + + requests := make([]*batcher.DispersalRequest, len(items)) + for i, item := range items { + requests[i], err = UnmarshalDispersalRequest(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal requests at index %d: %v", i, err) + return nil, err + } + } + + return requests, nil +} + +func (m *MinibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*batcher.DispersalResponse, error) { + item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ + "BatchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + "SK": &types.AttributeValueMemberS{ + Value: dispersalResponseSKPrefix + fmt.Sprintf("%d#%s", minibatchIndex, opID.Hex()), + }, + }) + if err != nil { + m.logger.Errorf("failed to get dispersal response from DynamoDB: %v", err) + return nil, err + } + + if item == nil { + return nil, nil + } + + response, err := UnmarshalDispersalResponse(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal response from DynamoDB: %v", err) + return nil, err + } + return response, nil +} + +func (m *MinibatchStore) GetDispersalResponses(ctx context.Context, batchID uuid.UUID) ([]*batcher.DispersalResponse, error) { + items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND begins_with(SK, :prefix)", commondynamodb.ExpresseionValues{ + ":batchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + ":prefix": &types.AttributeValueMemberS{ + Value: dispersalResponseSKPrefix, + }, + }) + if err != nil { + return nil, err + } + + responses := make([]*batcher.DispersalResponse, len(items)) + for i, item := range items { + responses[i], err = UnmarshalDispersalResponse(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal response at index %d: %v", i, err) + return nil, err + } + } + + return responses, nil +} + +func (m *MinibatchStore) GetMinibatchDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalResponse, error) { + items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND SK = :sk", commondynamodb.ExpresseionValues{ + ":batchID": &types.AttributeValueMemberS{ + Value: batchID.String(), + }, + ":sk": &types.AttributeValueMemberS{ + Value: dispersalResponseSKPrefix + fmt.Sprintf("%s#%d", batchID.String(), minibatchIndex), + }, + }) + if err != nil { + return nil, err + } + + if len(items) == 0 { + return nil, fmt.Errorf("no dispersal responses found for BatchID %s MinibatchIndex %d", batchID, minibatchIndex) + } + + responses := make([]*batcher.DispersalResponse, len(items)) + for i, item := range items { + responses[i], err = UnmarshalDispersalResponse(item) + if err != nil { + m.logger.Errorf("failed to unmarshal dispersal response at index %d: %v", i, err) + return nil, err + } + } + + return responses, nil +} diff --git a/disperser/batcher/batchstore/minibatch_store_test.go b/disperser/batcher/batchstore/minibatch_store_test.go new file mode 100644 index 0000000000..5e3d4c05e7 --- /dev/null +++ b/disperser/batcher/batchstore/minibatch_store_test.go @@ -0,0 +1,373 @@ +package batchstore_test + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/Layr-Labs/eigenda/common/aws" + "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser/batcher" + "github.com/Layr-Labs/eigenda/disperser/batcher/batchstore" + "github.com/Layr-Labs/eigenda/inabox/deploy" + "github.com/Layr-Labs/eigensdk-go/logging" + gcommon "github.com/ethereum/go-ethereum/common" + "github.com/google/uuid" + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/assert" +) + +var ( + logger = logging.NewNoopLogger() + + dockertestPool *dockertest.Pool + dockertestResource *dockertest.Resource + + deployLocalStack bool + localStackPort = "4566" + + dynamoClient *dynamodb.Client + minibatchStore *batchstore.MinibatchStore + + UUID = uuid.New() + minibatchTableName = fmt.Sprintf("test-MinibatchStore-%v", UUID) +) + +func setup(m *testing.M) { + deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false") + fmt.Printf("deployLocalStack: %v\n", deployLocalStack) + if !deployLocalStack { + localStackPort = os.Getenv("LOCALSTACK_PORT") + } + + if deployLocalStack { + var err error + dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localStackPort) + if err != nil { + teardown() + panic("failed to start localstack container") + } + + } + + cfg := aws.ClientConfig{ + Region: "us-east-1", + AccessKey: "localstack", + SecretAccessKey: "localstack", + EndpointURL: fmt.Sprintf("http://0.0.0.0:%s", localStackPort), + } + + _, err := test_utils.CreateTable(context.Background(), cfg, minibatchTableName, batchstore.GenerateTableSchema(minibatchTableName, 10, 10)) + if err != nil { + teardown() + panic("failed to create dynamodb table: " + err.Error()) + } + + dynamoClient, err = dynamodb.NewClient(cfg, logger) + if err != nil { + teardown() + panic("failed to create dynamodb client: " + err.Error()) + } + + minibatchStore = batchstore.NewMinibatchStore(dynamoClient, logger, minibatchTableName, time.Hour) +} + +func teardown() { + if deployLocalStack { + deploy.PurgeDockertestResources(dockertestPool, dockertestResource) + } +} + +func TestMain(m *testing.M) { + setup(m) + code := m.Run() + teardown() + os.Exit(code) +} + +func TestPutBatch(t *testing.T) { + ctx := context.Background() + id, err := uuid.NewV7() + assert.NoError(t, err) + ts := time.Now().Truncate(time.Second).UTC() + batch := &batcher.BatchRecord{ + ID: id, + CreatedAt: ts, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusPending, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + err = minibatchStore.PutBatch(ctx, batch) + assert.NoError(t, err) + err = minibatchStore.PutBatch(ctx, batch) + assert.Error(t, err) + b, err := minibatchStore.GetBatch(ctx, batch.ID) + assert.NoError(t, err) + assert.Equal(t, batch, b) + err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, batcher.BatchStatusFormed) + assert.NoError(t, err) + u, err := minibatchStore.GetBatch(ctx, batch.ID) + assert.NoError(t, err) + assert.Equal(t, batcher.BatchStatusFormed, u.Status) + err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, batcher.BatchStatusAttested) + assert.NoError(t, err) + u, err = minibatchStore.GetBatch(ctx, batch.ID) + assert.NoError(t, err) + assert.Equal(t, batcher.BatchStatusAttested, u.Status) + err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, batcher.BatchStatusFailed) + assert.NoError(t, err) + u, err = minibatchStore.GetBatch(ctx, batch.ID) + assert.NoError(t, err) + assert.Equal(t, batcher.BatchStatusFailed, u.Status) + err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, 4) + assert.Error(t, err) +} + +func TestGetBatchesByStatus(t *testing.T) { + ctx := context.Background() + id1, _ := uuid.NewV7() + id2, _ := uuid.NewV7() + id3, _ := uuid.NewV7() + ts := time.Now().Truncate(time.Second).UTC() + batch1 := &batcher.BatchRecord{ + ID: id1, + CreatedAt: ts, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusFormed, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + _ = minibatchStore.PutBatch(ctx, batch1) + batch2 := &batcher.BatchRecord{ + ID: id2, + CreatedAt: ts, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusFormed, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + err := minibatchStore.PutBatch(ctx, batch2) + assert.NoError(t, err) + batch3 := &batcher.BatchRecord{ + ID: id3, + CreatedAt: ts, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusFormed, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + err = minibatchStore.PutBatch(ctx, batch3) + assert.NoError(t, err) + + attested, err := minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusAttested) + assert.NoError(t, err) + assert.Equal(t, 0, len(attested)) + + formed, err := minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusFormed) + assert.NoError(t, err) + assert.Equal(t, 3, len(formed)) + + err = minibatchStore.UpdateBatchStatus(ctx, id1, batcher.BatchStatusAttested) + assert.NoError(t, err) + + formed, err = minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusFormed) + assert.NoError(t, err) + assert.Equal(t, 2, len(formed)) +} + +func TestPutMinibatch(t *testing.T) { + ctx := context.Background() + id, err := uuid.NewV7() + assert.NoError(t, err) + minibatch := &batcher.MinibatchRecord{ + BatchID: id, + MinibatchIndex: 12, + BlobHeaderHashes: [][32]byte{{1}}, + BatchSize: 1, + ReferenceBlockNumber: 1, + } + err = minibatchStore.PutMinibatch(ctx, minibatch) + assert.NoError(t, err) + m, err := minibatchStore.GetMinibatch(ctx, minibatch.BatchID, minibatch.MinibatchIndex) + assert.NoError(t, err) + assert.Equal(t, minibatch, m) +} + +func TestGetLatestFormedBatch(t *testing.T) { + ctx := context.Background() + id1, _ := uuid.NewV7() + id2, _ := uuid.NewV7() + ts := time.Now().Truncate(time.Second).UTC() + ts2 := ts.Add(10 * time.Second) + batch1 := &batcher.BatchRecord{ + ID: id1, + CreatedAt: ts, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusFormed, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + minibatch1 := &batcher.MinibatchRecord{ + BatchID: id1, + MinibatchIndex: 1, + BlobHeaderHashes: [][32]byte{{1}}, + BatchSize: 1, + ReferenceBlockNumber: 1, + } + batch2 := &batcher.BatchRecord{ + ID: id2, + CreatedAt: ts2, + ReferenceBlockNumber: 1, + Status: batcher.BatchStatusFormed, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + minibatch2 := &batcher.MinibatchRecord{ + BatchID: id2, + MinibatchIndex: 1, + BlobHeaderHashes: [][32]byte{{1}}, + BatchSize: 1, + ReferenceBlockNumber: 1, + } + minibatch3 := &batcher.MinibatchRecord{ + BatchID: id2, + MinibatchIndex: 2, + BlobHeaderHashes: [][32]byte{{1}}, + BatchSize: 1, + ReferenceBlockNumber: 1, + } + err := minibatchStore.PutBatch(ctx, batch1) + assert.NoError(t, err) + err = minibatchStore.PutMinibatch(ctx, minibatch1) + assert.NoError(t, err) + err = minibatchStore.PutBatch(ctx, batch2) + assert.NoError(t, err) + err = minibatchStore.PutMinibatch(ctx, minibatch2) + assert.NoError(t, err) + err = minibatchStore.PutMinibatch(ctx, minibatch3) + assert.NoError(t, err) + + batch, minibatches, err := minibatchStore.GetLatestFormedBatch(ctx) + assert.NoError(t, err) + assert.Equal(t, 2, len(minibatches)) + assert.Equal(t, batch.ID, batch2.ID) +} +func TestPutDispersalRequest(t *testing.T) { + ctx := context.Background() + id, err := uuid.NewV7() + assert.NoError(t, err) + ts := time.Now().Truncate(time.Second).UTC() + opID := core.OperatorID([32]byte{123}) + request := &batcher.DispersalRequest{ + BatchID: id, + MinibatchIndex: 0, + OperatorID: opID, + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: ts, + BlobHash: "blobHash", + MetadataHash: "metadataHash", + } + err = minibatchStore.PutDispersalRequest(ctx, request) + assert.NoError(t, err) + r, err := minibatchStore.GetDispersalRequest(ctx, request.BatchID, request.MinibatchIndex, opID) + assert.NoError(t, err) + assert.Equal(t, request, r) +} + +func TestPutDispersalResponse(t *testing.T) { + ctx := context.Background() + id, err := uuid.NewV7() + assert.NoError(t, err) + ts := time.Now().Truncate(time.Second).UTC() + opID := core.OperatorID([32]byte{123}) + blobHash := "blobHash" + metadataHash := "metadataHash" + response := &batcher.DispersalResponse{ + DispersalRequest: batcher.DispersalRequest{ + BatchID: id, + MinibatchIndex: 0, + OperatorID: opID, + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: ts, + BlobHash: blobHash, + MetadataHash: metadataHash, + }, + Signatures: nil, + RespondedAt: ts, + Error: nil, + } + err = minibatchStore.PutDispersalResponse(ctx, response) + assert.NoError(t, err) + r, err := minibatchStore.GetDispersalResponse(ctx, response.BatchID, response.MinibatchIndex, opID) + assert.NoError(t, err) + assert.Equal(t, response, r) +} + +func TestDispersalStatus(t *testing.T) { + ctx := context.Background() + id, err := uuid.NewV7() + assert.NoError(t, err) + ts := time.Now().Truncate(time.Second).UTC() + opID := core.OperatorID([32]byte{123}) + blobHash := "blobHash" + metadataHash := "metadataHash" + + // no dispersals + dispersed, err := minibatchStore.BatchDispersed(ctx, id) + assert.NoError(t, err) + assert.False(t, dispersed) + + request := &batcher.DispersalRequest{ + BatchID: id, + MinibatchIndex: 0, + OperatorID: opID, + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: ts, + BlobHash: blobHash, + MetadataHash: metadataHash, + } + err = minibatchStore.PutDispersalRequest(ctx, request) + assert.NoError(t, err) + + // dispersal request but no response + dispersed, err = minibatchStore.BatchDispersed(ctx, id) + assert.NoError(t, err) + assert.False(t, dispersed) + + response := &batcher.DispersalResponse{ + DispersalRequest: batcher.DispersalRequest{ + BatchID: id, + MinibatchIndex: 0, + OperatorID: opID, + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: ts, + BlobHash: blobHash, + MetadataHash: metadataHash, + }, + Signatures: nil, + RespondedAt: ts, + Error: nil, + } + err = minibatchStore.PutDispersalResponse(ctx, response) + assert.NoError(t, err) + + // dispersal request and response + dispersed, err = minibatchStore.BatchDispersed(ctx, id) + assert.NoError(t, err) + assert.True(t, dispersed) +} diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 9c14a433b8..ca84504ffb 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -3,6 +3,7 @@ package dispatcher import ( "context" "errors" + "fmt" "time" commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" @@ -18,7 +19,8 @@ import ( ) type Config struct { - Timeout time.Duration + Timeout time.Duration + EnableGnarkBundleEncoding bool } type dispatcher struct { @@ -126,7 +128,7 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, ctx, cancel := context.WithTimeout(ctx, c.Timeout) defer cancel() start := time.Now() - request, totalSize, err := GetStoreChunksRequest(blobs, batchHeader) + request, totalSize, err := GetStoreChunksRequest(blobs, batchHeader, c.EnableGnarkBundleEncoding) if err != nil { return nil, err } @@ -167,7 +169,7 @@ func (c *dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.Blob ctx, cancel := context.WithTimeout(ctx, c.Timeout) defer cancel() start := time.Now() - request, totalSize, err := GetStoreBlobsRequest(blobs, batchHeader) + request, totalSize, err := GetStoreBlobsRequest(blobs, batchHeader, c.EnableGnarkBundleEncoding) if err != nil { return nil, err } @@ -196,12 +198,94 @@ func (c *dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.Blob return signatures, nil } -func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader) (*node.StoreChunksRequest, int64, error) { +func (c *dispatcher) AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) { + batchHeaderHash, err := batchHeader.GetBatchHeaderHash() + if err != nil { + return nil, err + } + responseChan := make(chan core.SigningMessage, len(state.IndexedOperators)) + + for id, op := range state.IndexedOperators { + go func(op core.IndexedOperatorInfo, id core.OperatorID) { + conn, err := grpc.Dial( + core.OperatorSocket(op.Socket).GetDispersalSocket(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + c.logger.Error("disperser cannot connect to operator dispersal socket", "socket", core.OperatorSocket(op.Socket).GetDispersalSocket(), "err", err) + return + } + defer conn.Close() + + nodeClient := node.NewDispersalClient(conn) + + requestedAt := time.Now() + sig, err := c.SendAttestBatchRequest(ctx, nodeClient, blobHeaderHashes, batchHeader, &op) + latencyMs := float64(time.Since(requestedAt).Milliseconds()) + if err != nil { + responseChan <- core.SigningMessage{ + Err: err, + Signature: nil, + Operator: id, + BatchHeaderHash: batchHeaderHash, + AttestationLatencyMs: latencyMs, + } + c.metrics.ObserveLatency(id.Hex(), false, latencyMs) + } else { + responseChan <- core.SigningMessage{ + Signature: sig, + Operator: id, + BatchHeaderHash: batchHeaderHash, + AttestationLatencyMs: latencyMs, + Err: nil, + } + c.metrics.ObserveLatency(id.Hex(), true, latencyMs) + } + }(core.IndexedOperatorInfo{ + PubkeyG1: op.PubkeyG1, + PubkeyG2: op.PubkeyG2, + Socket: op.Socket, + }, id) + } + + return responseChan, nil +} + +func (c *dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { + ctx, cancel := context.WithTimeout(ctx, c.Timeout) + defer cancel() + // start := time.Now() + hashes := make([][]byte, len(blobHeaderHashes)) + for i, hash := range blobHeaderHashes { + hashes[i] = hash[:] + } + + request := &node.AttestBatchRequest{ + BatchHeader: getBatchHeaderMessage(batchHeader), + BlobHeaderHashes: hashes, + } + + c.logger.Debug("sending AttestBatch request to operator", "operator", op.Socket, "numBlobs", len(blobHeaderHashes), "requestMessageSize", proto.Size(request), "referenceBlockNumber", batchHeader.ReferenceBlockNumber) + opt := grpc.MaxCallSendMsgSize(60 * 1024 * 1024 * 1024) + reply, err := nodeDispersalClient.AttestBatch(ctx, request, opt) + if err != nil { + return nil, fmt.Errorf("failed to send AttestBatch request to operator %s: %w", core.OperatorSocket(op.Socket).GetDispersalSocket(), err) + } + + sigBytes := reply.GetSignature() + point, err := new(core.Signature).Deserialize(sigBytes) + if err != nil { + return nil, fmt.Errorf("failed to deserialize signature: %w", err) + } + return &core.Signature{G1Point: point}, nil +} + +func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreChunksRequest, int64, error) { blobs := make([]*node.Blob, len(blobMessages)) totalSize := int64(0) for i, blob := range blobMessages { var err error - blobs[i], err = getBlobMessage(blob) + blobs[i], err = getBlobMessage(blob, useGnarkBundleEncoding) if err != nil { return nil, 0, err } @@ -216,12 +300,12 @@ func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.B return request, totalSize, nil } -func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader) (*node.StoreBlobsRequest, int64, error) { +func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreBlobsRequest, int64, error) { blobs := make([]*node.Blob, len(blobMessages)) totalSize := int64(0) for i, blob := range blobMessages { var err error - blobs[i], err = getBlobMessage(blob) + blobs[i], err = getBlobMessage(blob, useGnarkBundleEncoding) if err != nil { return nil, 0, err } @@ -236,7 +320,7 @@ func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.Ba return request, totalSize, nil } -func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) { +func getBlobMessage(blob *core.BlobMessage, useGnarkBundleEncoding bool) (*node.Blob, error) { if blob.BlobHeader == nil { return nil, errors.New("blob header is nil") } @@ -273,22 +357,43 @@ func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) { } } - data, err := blob.Bundles.Serialize() - if err != nil { - return nil, err - } bundles := make([]*node.Bundle, len(quorumHeaders)) - // the ordering of quorums in bundles must be same as in quorumHeaders - for i, quorumHeader := range quorumHeaders { - quorum := quorumHeader.QuorumId - if _, ok := blob.Bundles[uint8(quorum)]; ok { - bundles[i] = &node.Bundle{ - Chunks: data[quorum], + if useGnarkBundleEncoding { + // the ordering of quorums in bundles must be same as in quorumHeaders + for i, quorumHeader := range quorumHeaders { + quorum := quorumHeader.QuorumId + if bundle, ok := blob.Bundles[uint8(quorum)]; ok { + bundleBytes, err := bundle.Serialize() + if err != nil { + return nil, err + } + bundles[i] = &node.Bundle{ + Bundle: bundleBytes, + } + } else { + bundles[i] = &node.Bundle{ + // empty bundle for quorums operators are not part of + Bundle: make([]byte, 0), + } } - } else { - bundles[i] = &node.Bundle{ - // empty bundle for quorums operators are not part of - Chunks: make([][]byte, 0), + } + } else { + data, err := blob.Bundles.Serialize() + if err != nil { + return nil, err + } + // the ordering of quorums in bundles must be same as in quorumHeaders + for i, quorumHeader := range quorumHeaders { + quorum := quorumHeader.QuorumId + if _, ok := blob.Bundles[uint8(quorum)]; ok { + bundles[i] = &node.Bundle{ + Chunks: data[quorum], + } + } else { + bundles[i] = &node.Bundle{ + // empty bundle for quorums operators are not part of + Chunks: make([][]byte, 0), + } } } } diff --git a/disperser/batcher/grpc/dispatcher_test.go b/disperser/batcher/grpc/dispatcher_test.go new file mode 100644 index 0000000000..0a6ea539f2 --- /dev/null +++ b/disperser/batcher/grpc/dispatcher_test.go @@ -0,0 +1,59 @@ +package dispatcher_test + +import ( + "context" + "math/big" + "testing" + "time" + + grpcMock "github.com/Layr-Labs/eigenda/api/grpc/mock" + "github.com/Layr-Labs/eigenda/api/grpc/node" + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/disperser/batcher" + dispatcher "github.com/Layr-Labs/eigenda/disperser/batcher/grpc" + "github.com/consensys/gnark-crypto/ecc/bn254" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/stretchr/testify/assert" +) + +func newDispatcher(t *testing.T, config *dispatcher.Config) disperser.Dispatcher { + loggerConfig := common.DefaultLoggerConfig() + logger, err := common.NewLogger(loggerConfig) + assert.NoError(t, err) + metrics := batcher.NewMetrics("9091", logger) + return dispatcher.NewDispatcher(config, logger, metrics.DispatcherMetrics) +} + +func TestSendAttestBatchRequest(t *testing.T) { + dispatcher := newDispatcher(t, &dispatcher.Config{ + Timeout: 5 * time.Second, + }) + nodeClient := grpcMock.NewMockDispersalClient() + var X, Y fp.Element + X = *X.SetBigInt(big.NewInt(1)) + Y = *Y.SetBigInt(big.NewInt(2)) + signature := &core.Signature{ + G1Point: &core.G1Point{ + G1Affine: &bn254.G1Affine{ + X: X, + Y: Y, + }, + }, + } + sigBytes := signature.Bytes() + nodeClient.On("AttestBatch").Return(&node.AttestBatchReply{ + Signature: sigBytes[:], + }, nil) + sigReply, err := dispatcher.SendAttestBatchRequest(context.Background(), nodeClient, [][32]byte{{1}}, &core.BatchHeader{ + ReferenceBlockNumber: 10, + BatchRoot: [32]byte{1}, + }, &core.IndexedOperatorInfo{ + PubkeyG1: nil, + PubkeyG2: nil, + Socket: "localhost:8080", + }) + assert.NoError(t, err) + assert.Equal(t, signature, sigReply) +} diff --git a/disperser/batcher/inmem/minibatch_store.go b/disperser/batcher/inmem/minibatch_store.go index c11e7b2406..132d090a67 100644 --- a/disperser/batcher/inmem/minibatch_store.go +++ b/disperser/batcher/inmem/minibatch_store.go @@ -1,23 +1,30 @@ package inmem import ( - "fmt" + "context" + "errors" + "sort" + "sync" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/disperser/batcher" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/google/uuid" ) +var BatchNotFound = errors.New("batch not found") + type minibatchStore struct { // BatchRecords maps batch IDs to batch records BatchRecords map[uuid.UUID]*batcher.BatchRecord // MinibatchRecords maps batch IDs to a map from minibatch indices to minibatch records MinibatchRecords map[uuid.UUID]map[uint]*batcher.MinibatchRecord // DispersalRequests maps batch IDs to a map from minibatch indices to dispersal requests - DispersalRequests map[uuid.UUID]map[uint]*batcher.DispersalRequest + DispersalRequests map[uuid.UUID]map[uint][]*batcher.DispersalRequest // DispersalResponses maps batch IDs to a map from minibatch indices to dispersal responses - DispersalResponses map[uuid.UUID]map[uint]*batcher.DispersalResponse + DispersalResponses map[uuid.UUID]map[uint][]*batcher.DispersalResponse + mu sync.RWMutex logger logging.Logger } @@ -27,28 +34,65 @@ func NewMinibatchStore(logger logging.Logger) batcher.MinibatchStore { return &minibatchStore{ BatchRecords: make(map[uuid.UUID]*batcher.BatchRecord), MinibatchRecords: make(map[uuid.UUID]map[uint]*batcher.MinibatchRecord), - DispersalRequests: make(map[uuid.UUID]map[uint]*batcher.DispersalRequest), - DispersalResponses: make(map[uuid.UUID]map[uint]*batcher.DispersalResponse), + DispersalRequests: make(map[uuid.UUID]map[uint][]*batcher.DispersalRequest), + DispersalResponses: make(map[uuid.UUID]map[uint][]*batcher.DispersalResponse), logger: logger, } } -func (m *minibatchStore) PutBatch(batch *batcher.BatchRecord) error { +func (m *minibatchStore) PutBatch(ctx context.Context, batch *batcher.BatchRecord) error { + m.mu.Lock() + defer m.mu.Unlock() + m.BatchRecords[batch.ID] = batch return nil } -func (m *minibatchStore) GetBatch(batchID uuid.UUID) (*batcher.BatchRecord, error) { +func (m *minibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batcher.BatchRecord, error) { + m.mu.RLock() + defer m.mu.RUnlock() + b, ok := m.BatchRecords[batchID] if !ok { - return nil, fmt.Errorf("batch not found") + return nil, BatchNotFound } return b, nil } -func (m *minibatchStore) PutMiniBatch(minibatch *batcher.MinibatchRecord) error { +func (m *minibatchStore) GetBatchesByStatus(ctx context.Context, status batcher.BatchStatus) ([]*batcher.BatchRecord, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + var batches []*batcher.BatchRecord + for _, b := range m.BatchRecords { + if b.Status == status { + batches = append(batches, b) + } + } + sort.Slice(batches, func(i, j int) bool { + return batches[i].CreatedAt.Before(batches[j].CreatedAt) + }) + return batches, nil +} + +func (m *minibatchStore) UpdateBatchStatus(ctx context.Context, batchID uuid.UUID, status batcher.BatchStatus) error { + m.mu.Lock() + defer m.mu.Unlock() + + b, ok := m.BatchRecords[batchID] + if !ok { + return BatchNotFound + } + b.Status = status + return nil +} + +func (m *minibatchStore) PutMinibatch(ctx context.Context, minibatch *batcher.MinibatchRecord) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.MinibatchRecords[minibatch.BatchID]; !ok { m.MinibatchRecords[minibatch.BatchID] = make(map[uint]*batcher.MinibatchRecord) } @@ -57,47 +101,207 @@ func (m *minibatchStore) PutMiniBatch(minibatch *batcher.MinibatchRecord) error return nil } -func (m *minibatchStore) GetMiniBatch(batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) { +func (m *minibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) { + m.mu.RLock() + defer m.mu.RUnlock() + if _, ok := m.MinibatchRecords[batchID]; !ok { - return nil, nil + return nil, BatchNotFound } return m.MinibatchRecords[batchID][minibatchIndex], nil } -func (m *minibatchStore) PutDispersalRequest(request *batcher.DispersalRequest) error { +func (m *minibatchStore) GetMinibatches(ctx context.Context, batchID uuid.UUID) ([]*batcher.MinibatchRecord, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + if _, ok := m.MinibatchRecords[batchID]; !ok { + return nil, nil + } + + res := make([]*batcher.MinibatchRecord, 0, len(m.MinibatchRecords[batchID])) + for _, minibatch := range m.MinibatchRecords[batchID] { + res = append(res, minibatch) + } + sort.Slice(res, func(i, j int) bool { + return res[i].MinibatchIndex < res[j].MinibatchIndex + }) + + return res, nil +} + +func (m *minibatchStore) PutDispersalRequest(ctx context.Context, request *batcher.DispersalRequest) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.DispersalRequests[request.BatchID]; !ok { - m.DispersalRequests[request.BatchID] = make(map[uint]*batcher.DispersalRequest) + m.DispersalRequests[request.BatchID] = make(map[uint][]*batcher.DispersalRequest) } - m.DispersalRequests[request.BatchID][request.MinibatchIndex] = request + + if _, ok := m.DispersalRequests[request.BatchID][request.MinibatchIndex]; !ok { + m.DispersalRequests[request.BatchID][request.MinibatchIndex] = make([]*batcher.DispersalRequest, 0) + } + + for _, r := range m.DispersalRequests[request.BatchID][request.MinibatchIndex] { + if r.OperatorID == request.OperatorID { + // replace existing record + *r = *request + return nil + } + } + + m.DispersalRequests[request.BatchID][request.MinibatchIndex] = append(m.DispersalRequests[request.BatchID][request.MinibatchIndex], request) return nil } -func (m *minibatchStore) GetDispersalRequest(batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalRequest, error) { +func (m *minibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*batcher.DispersalRequest, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + requests, err := m.GetMinibatchDispersalRequests(ctx, batchID, minibatchIndex) + if err != nil { + return nil, err + } + for _, r := range requests { + if r.OperatorID == opID { + return r, nil + } + } + return nil, nil +} + +func (m *minibatchStore) GetMinibatchDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalRequest, error) { + m.mu.RLock() + defer m.mu.RUnlock() + if _, ok := m.DispersalRequests[batchID]; !ok { - return nil, nil + return nil, BatchNotFound } return m.DispersalRequests[batchID][minibatchIndex], nil } -func (m *minibatchStore) PutDispersalResponse(response *batcher.DispersalResponse) error { +func (m *minibatchStore) PutDispersalResponse(ctx context.Context, response *batcher.DispersalResponse) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.DispersalResponses[response.BatchID]; !ok { - m.DispersalResponses[response.BatchID] = make(map[uint]*batcher.DispersalResponse) + m.DispersalResponses[response.BatchID] = make(map[uint][]*batcher.DispersalResponse) + } + + if _, ok := m.DispersalResponses[response.BatchID][response.MinibatchIndex]; !ok { + m.DispersalResponses[response.BatchID][response.MinibatchIndex] = make([]*batcher.DispersalResponse, 0) + } + + for _, r := range m.DispersalResponses[response.BatchID][response.MinibatchIndex] { + if r.OperatorID == response.OperatorID { + // replace existing record + *r = *response + return nil + } } - m.DispersalResponses[response.BatchID][response.MinibatchIndex] = response + + m.DispersalResponses[response.BatchID][response.MinibatchIndex] = append(m.DispersalResponses[response.BatchID][response.MinibatchIndex], response) return nil } -func (m *minibatchStore) GetDispersalResponse(batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalResponse, error) { +func (m *minibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*batcher.DispersalResponse, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + responses, err := m.GetMinibatchDispersalResponses(ctx, batchID, minibatchIndex) + if err != nil { + return nil, err + } + for _, r := range responses { + if r.OperatorID == opID { + return r, nil + } + } + return nil, nil +} + +func (m *minibatchStore) GetMinibatchDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalResponse, error) { + m.mu.RLock() + defer m.mu.RUnlock() + if _, ok := m.DispersalResponses[batchID]; !ok { - return nil, nil + return nil, BatchNotFound } return m.DispersalResponses[batchID][minibatchIndex], nil } -func (m *minibatchStore) GetPendingBatch() (*batcher.BatchRecord, error) { - return nil, nil +func (m *minibatchStore) GetLatestFormedBatch(ctx context.Context) (batch *batcher.BatchRecord, minibatches []*batcher.MinibatchRecord, err error) { + m.mu.RLock() + defer m.mu.RUnlock() + + batches, err := m.GetBatchesByStatus(ctx, batcher.BatchStatusFormed) + if err != nil { + return nil, nil, err + } + if len(batches) == 0 { + return nil, nil, nil + } + + batch = batches[0] + minibatches, err = m.GetMinibatches(ctx, batches[0].ID) + if err != nil { + return nil, nil, err + } + + return batch, minibatches, nil +} + +func (m *minibatchStore) getDispersals(ctx context.Context, batchID uuid.UUID) ([]*batcher.DispersalRequest, []*batcher.DispersalResponse, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + if _, ok := m.DispersalRequests[batchID]; !ok { + return nil, nil, BatchNotFound + } + + if _, ok := m.DispersalResponses[batchID]; !ok { + return nil, nil, BatchNotFound + } + + requests := make([]*batcher.DispersalRequest, 0) + for _, reqs := range m.DispersalRequests[batchID] { + requests = append(requests, reqs...) + } + + responses := make([]*batcher.DispersalResponse, 0) + for _, resp := range m.DispersalResponses[batchID] { + responses = append(responses, resp...) + } + + return requests, responses, nil +} + +func (m *minibatchStore) BatchDispersed(ctx context.Context, batchID uuid.UUID) (bool, error) { + dispersed := true + requests, responses, err := m.getDispersals(ctx, batchID) + if err != nil { + return false, err + } + + if len(requests) == 0 || len(responses) == 0 { + return false, nil + } + + if len(requests) != len(responses) { + m.logger.Info("number of minibatch dispersal requests does not match the number of responses", "batchID", batchID, "numRequests", len(requests), "numResponses", len(responses)) + return false, nil + } + + for _, resp := range responses { + if resp.RespondedAt.IsZero() { + dispersed = false + m.logger.Info("response pending", "batchID", batchID, "minibatchIndex", resp.MinibatchIndex, "operatorID", resp.OperatorID.Hex()) + } + } + + return dispersed, nil } diff --git a/disperser/batcher/inmem/minibatch_store_test.go b/disperser/batcher/inmem/minibatch_store_test.go index dbe0a01054..29b30294e4 100644 --- a/disperser/batcher/inmem/minibatch_store_test.go +++ b/disperser/batcher/inmem/minibatch_store_test.go @@ -1,6 +1,7 @@ package inmem_test import ( + "context" "testing" "time" @@ -25,13 +26,15 @@ func TestPutBatch(t *testing.T) { ID: id, CreatedAt: time.Now().UTC(), ReferenceBlockNumber: 1, + Status: 1, HeaderHash: [32]byte{1}, AggregatePubKey: nil, AggregateSignature: nil, } - err = s.PutBatch(batch) + ctx := context.Background() + err = s.PutBatch(ctx, batch) assert.NoError(t, err) - b, err := s.GetBatch(batch.ID) + b, err := s.GetBatch(ctx, batch.ID) assert.NoError(t, err) assert.Equal(t, batch, b) } @@ -47,9 +50,10 @@ func TestPutMiniBatch(t *testing.T) { BatchSize: 1, ReferenceBlockNumber: 1, } - err = s.PutMiniBatch(minibatch) + ctx := context.Background() + err = s.PutMinibatch(ctx, minibatch) assert.NoError(t, err) - m, err := s.GetMiniBatch(minibatch.BatchID, minibatch.MinibatchIndex) + m, err := s.GetMinibatch(ctx, minibatch.BatchID, minibatch.MinibatchIndex) assert.NoError(t, err) assert.Equal(t, minibatch, m) } @@ -58,41 +62,98 @@ func TestPutDispersalRequest(t *testing.T) { s := newMinibatchStore() id, err := uuid.NewV7() assert.NoError(t, err) - request := &batcher.DispersalRequest{ + minibatchIndex := uint(0) + ctx := context.Background() + req1 := &batcher.DispersalRequest{ BatchID: id, - MinibatchIndex: 0, + MinibatchIndex: minibatchIndex, OperatorID: core.OperatorID([32]byte{1}), OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, RequestedAt: time.Now().UTC(), + BlobHash: "1a2b", + MetadataHash: "3c4d", } - err = s.PutDispersalRequest(request) + err = s.PutDispersalRequest(ctx, req1) assert.NoError(t, err) - r, err := s.GetDispersalRequest(request.BatchID, request.MinibatchIndex) + req2 := &batcher.DispersalRequest{ + BatchID: id, + MinibatchIndex: minibatchIndex, + OperatorID: core.OperatorID([32]byte{2}), + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: time.Now().UTC(), + BlobHash: "1a2b", + MetadataHash: "3c4d", + } + err = s.PutDispersalRequest(ctx, req2) + assert.NoError(t, err) + + r, err := s.GetMinibatchDispersalRequests(ctx, id, minibatchIndex) assert.NoError(t, err) - assert.Equal(t, request, r) + assert.Len(t, r, 2) + assert.Equal(t, req1, r[0]) + assert.Equal(t, req2, r[1]) + + req, err := s.GetDispersalRequest(ctx, id, minibatchIndex, req1.OperatorID) + assert.NoError(t, err) + assert.Equal(t, req1, req) + + req, err = s.GetDispersalRequest(ctx, id, minibatchIndex, req2.OperatorID) + assert.NoError(t, err) + assert.Equal(t, req2, req) } func TestPutDispersalResponse(t *testing.T) { s := newMinibatchStore() id, err := uuid.NewV7() assert.NoError(t, err) - response := &batcher.DispersalResponse{ + ctx := context.Background() + minibatchIndex := uint(0) + resp1 := &batcher.DispersalResponse{ DispersalRequest: batcher.DispersalRequest{ BatchID: id, - MinibatchIndex: 0, + MinibatchIndex: minibatchIndex, OperatorID: core.OperatorID([32]byte{1}), OperatorAddress: gcommon.HexToAddress("0x0"), NumBlobs: 1, RequestedAt: time.Now().UTC(), + BlobHash: "1a2b", + MetadataHash: "3c4d", }, Signatures: nil, RespondedAt: time.Now().UTC(), Error: nil, } - err = s.PutDispersalResponse(response) + resp2 := &batcher.DispersalResponse{ + DispersalRequest: batcher.DispersalRequest{ + BatchID: id, + MinibatchIndex: minibatchIndex, + OperatorID: core.OperatorID([32]byte{2}), + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: time.Now().UTC(), + BlobHash: "0x0", + MetadataHash: "0x0", + }, + Signatures: nil, + RespondedAt: time.Now().UTC(), + Error: nil, + } + err = s.PutDispersalResponse(ctx, resp1) assert.NoError(t, err) - r, err := s.GetDispersalResponse(response.BatchID, response.MinibatchIndex) + err = s.PutDispersalResponse(ctx, resp2) + assert.NoError(t, err) + + r, err := s.GetMinibatchDispersalResponses(ctx, id, minibatchIndex) + assert.NoError(t, err) + assert.Len(t, r, 2) + + resp, err := s.GetDispersalResponse(ctx, id, minibatchIndex, resp1.OperatorID) + assert.NoError(t, err) + assert.Equal(t, resp1, resp) + + resp, err = s.GetDispersalResponse(ctx, id, minibatchIndex, resp2.OperatorID) assert.NoError(t, err) - assert.Equal(t, response, r) + assert.Equal(t, resp2, resp) } diff --git a/disperser/batcher/minibatch_store.go b/disperser/batcher/minibatch_store.go index a3b45e20f7..d028113214 100644 --- a/disperser/batcher/minibatch_store.go +++ b/disperser/batcher/minibatch_store.go @@ -1,6 +1,7 @@ package batcher import ( + "context" "time" "github.com/Layr-Labs/eigenda/core" @@ -8,17 +9,37 @@ import ( "github.com/google/uuid" ) +type BatchStatus uint + +// Pending: the batch has been created and minibatches are being added. There can be only one pending batch at a time. +// Formed: the batch has been formed and no more minibatches can be added. Implies that all minibatch records and dispersal request records have been created. +// +// Attested: the batch has been attested. +// Failed: the batch has failed. +// +// The batch lifecycle is as follows: +// Pending -> Formed -> Attested +// \ / +// \-> Failed <-/ +const ( + BatchStatusPending BatchStatus = iota + BatchStatusFormed + BatchStatusAttested + BatchStatusFailed +) + type BatchRecord struct { - ID uuid.UUID + ID uuid.UUID `dynamodbav:"-"` CreatedAt time.Time ReferenceBlockNumber uint + Status BatchStatus `dynamodbav:"BatchStatus"` HeaderHash [32]byte AggregatePubKey *core.G2Point AggregateSignature *core.Signature } type MinibatchRecord struct { - BatchID uuid.UUID + BatchID uuid.UUID `dynamodbav:"-"` MinibatchIndex uint BlobHeaderHashes [][32]byte BatchSize uint64 // in bytes @@ -26,12 +47,15 @@ type MinibatchRecord struct { } type DispersalRequest struct { - BatchID uuid.UUID - MinibatchIndex uint - core.OperatorID + BatchID uuid.UUID `dynamodbav:"-"` + MinibatchIndex uint + core.OperatorID `dynamodbav:"-"` OperatorAddress gcommon.Address + Socket string NumBlobs uint RequestedAt time.Time + BlobHash string + MetadataHash string } type DispersalResponse struct { @@ -42,13 +66,23 @@ type DispersalResponse struct { } type MinibatchStore interface { - PutBatch(batch *BatchRecord) error - GetBatch(batchID uuid.UUID) (*BatchRecord, error) - PutMiniBatch(minibatch *MinibatchRecord) error - GetMiniBatch(batchID uuid.UUID, minibatchIndex uint) (*MinibatchRecord, error) - PutDispersalRequest(request *DispersalRequest) error - GetDispersalRequest(batchID uuid.UUID, minibatchIndex uint) (*DispersalRequest, error) - PutDispersalResponse(response *DispersalResponse) error - GetDispersalResponse(batchID uuid.UUID, minibatchIndex uint) (*DispersalResponse, error) - GetPendingBatch() (*BatchRecord, error) + PutBatch(ctx context.Context, batch *BatchRecord) error + GetBatch(ctx context.Context, batchID uuid.UUID) (*BatchRecord, error) + GetBatchesByStatus(ctx context.Context, status BatchStatus) ([]*BatchRecord, error) + UpdateBatchStatus(ctx context.Context, batchID uuid.UUID, status BatchStatus) error + PutMinibatch(ctx context.Context, minibatch *MinibatchRecord) error + GetMinibatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*MinibatchRecord, error) + GetMinibatches(ctx context.Context, batchID uuid.UUID) ([]*MinibatchRecord, error) + PutDispersalRequest(ctx context.Context, request *DispersalRequest) error + GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*DispersalRequest, error) + GetMinibatchDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*DispersalRequest, error) + PutDispersalResponse(ctx context.Context, response *DispersalResponse) error + GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*DispersalResponse, error) + GetMinibatchDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*DispersalResponse, error) + + // GetLatestFormedBatch returns the latest batch that has been formed. + // If there is no formed batch, it returns nil. + // It also returns the minibatches that belong to the batch in the ascending order of minibatch index. + GetLatestFormedBatch(ctx context.Context) (batch *BatchRecord, minibatches []*MinibatchRecord, err error) + BatchDispersed(ctx context.Context, batchID uuid.UUID) (bool, error) } diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go new file mode 100644 index 0000000000..c6c83e2a74 --- /dev/null +++ b/disperser/batcher/minibatcher.go @@ -0,0 +1,311 @@ +package batcher + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/google/uuid" + "github.com/hashicorp/go-multierror" +) + +type MinibatcherConfig struct { + PullInterval time.Duration + MaxNumConnections uint + MaxNumRetriesPerBlob uint + MaxNumRetriesPerDispersal uint +} + +type Minibatcher struct { + MinibatcherConfig + + BlobStore disperser.BlobStore + MinibatchStore MinibatchStore + Dispatcher disperser.Dispatcher + ChainState core.IndexedChainState + AssignmentCoordinator core.AssignmentCoordinator + EncodingStreamer *EncodingStreamer + Pool common.WorkerPool + + // local state + ReferenceBlockNumber uint + BatchID uuid.UUID + MinibatchIndex uint + + ethClient common.EthClient + logger logging.Logger +} + +func NewMinibatcher( + config MinibatcherConfig, + blobStore disperser.BlobStore, + minibatchStore MinibatchStore, + dispatcher disperser.Dispatcher, + chainState core.IndexedChainState, + assignmentCoordinator core.AssignmentCoordinator, + // aggregator core.SignatureAggregator, + encodingStreamer *EncodingStreamer, + ethClient common.EthClient, + workerpool common.WorkerPool, + logger logging.Logger, +) (*Minibatcher, error) { + return &Minibatcher{ + MinibatcherConfig: config, + BlobStore: blobStore, + MinibatchStore: minibatchStore, + Dispatcher: dispatcher, + ChainState: chainState, + AssignmentCoordinator: assignmentCoordinator, + EncodingStreamer: encodingStreamer, + Pool: workerpool, + + ReferenceBlockNumber: 0, + BatchID: uuid.Nil, + MinibatchIndex: 0, + + ethClient: ethClient, + logger: logger.With("component", "Minibatcher"), + }, nil +} + +func (b *Minibatcher) Start(ctx context.Context) error { + err := b.ChainState.Start(ctx) + if err != nil { + return err + } + // Wait for few seconds for indexer to index blockchain + // This won't be needed when we switch to using Graph node + time.Sleep(indexerWarmupDelay) + go func() { + ticker := time.NewTicker(b.PullInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := b.HandleSingleBatch(ctx); err != nil { + if errors.Is(err, errNoEncodedResults) { + b.logger.Warn("no encoded results to make a batch with") + } else { + b.logger.Error("failed to process a batch", "err", err) + } + } + } + } + }() + + return nil +} + +func (b *Minibatcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error { + var result *multierror.Error + numPermanentFailures := 0 + for _, metadata := range blobMetadatas { + b.EncodingStreamer.RemoveEncodedBlob(metadata) + retry, err := b.BlobStore.HandleBlobFailure(ctx, metadata, b.MaxNumRetriesPerBlob) + if err != nil { + b.logger.Error("HandleSingleBatch: error handling blob failure", "err", err) + // Append the error + result = multierror.Append(result, err) + } + + if retry { + continue + } + numPermanentFailures++ + } + + // Return the error(s) + return result.ErrorOrNil() +} + +func (b *Minibatcher) HandleSingleBatch(ctx context.Context) error { + log := b.logger + // If too many dispersal requests are pending, skip an iteration + if pending := b.Pool.WaitingQueueSize(); pending > int(b.MaxNumConnections) { + return fmt.Errorf("too many pending requests %d with max number of connections %d. skipping minibatch iteration", pending, b.MaxNumConnections) + } + stageTimer := time.Now() + // All blobs in this batch are marked as DISPERSING + batch, err := b.EncodingStreamer.CreateMinibatch(ctx) + if err != nil { + return err + } + log.Debug("CreateMinibatch took", "duration", time.Since(stageTimer).String()) + + // Processing new full batch + if b.ReferenceBlockNumber < batch.BatchHeader.ReferenceBlockNumber { + // Update status of the previous batch + if b.BatchID != uuid.Nil { + err = b.MinibatchStore.UpdateBatchStatus(ctx, b.BatchID, BatchStatusFormed) + if err != nil { + _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error updating batch status")) + return fmt.Errorf("error updating batch status: %w", err) + } + } + + // Create new batch + b.BatchID, err = uuid.NewV7() + if err != nil { + _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error generating batch UUID")) + return fmt.Errorf("error generating batch ID: %w", err) + } + batchHeaderHash, err := batch.BatchHeader.GetBatchHeaderHash() + if err != nil { + _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error getting batch header hash")) + return fmt.Errorf("error getting batch header hash: %w", err) + } + b.MinibatchIndex = 0 + b.ReferenceBlockNumber = batch.BatchHeader.ReferenceBlockNumber + err = b.MinibatchStore.PutBatch(ctx, &BatchRecord{ + ID: b.BatchID, + CreatedAt: time.Now().UTC(), + ReferenceBlockNumber: b.ReferenceBlockNumber, + HeaderHash: batchHeaderHash, + }) + if err != nil { + _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error storing batch record")) + return fmt.Errorf("error storing batch record: %w", err) + } + } + + // Store minibatch record + blobHeaderHashes := make([][32]byte, 0, len(batch.EncodedBlobs)) + batchSize := int64(0) + for _, blob := range batch.EncodedBlobs { + h, err := blob.BlobHeader.GetBlobHeaderHash() + if err != nil { + _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error getting blob header hash")) + return fmt.Errorf("error getting blob header hash: %w", err) + } + blobHeaderHashes = append(blobHeaderHashes, h) + batchSize += blob.BlobHeader.EncodedSizeAllQuorums() + } + err = b.MinibatchStore.PutMinibatch(ctx, &MinibatchRecord{ + BatchID: b.BatchID, + MinibatchIndex: b.MinibatchIndex, + BlobHeaderHashes: blobHeaderHashes, + BatchSize: uint64(batchSize), + ReferenceBlockNumber: b.ReferenceBlockNumber, + }) + if err != nil { + _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error storing minibatch record")) + return fmt.Errorf("error storing minibatch record: %w", err) + } + + // Dispatch encoded batch + log.Debug("Dispatching encoded batch...", "batchID", b.BatchID, "minibatchIndex", b.MinibatchIndex, "referenceBlockNumber", b.ReferenceBlockNumber, "numBlobs", len(batch.EncodedBlobs)) + stageTimer = time.Now() + b.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, b.BatchID, b.MinibatchIndex) + log.Debug("DisperseBatch took", "duration", time.Since(stageTimer).String()) + + h, err := batch.State.OperatorState.Hash() + if err != nil { + log.Error("error getting operator state hash", "err", err) + } + hStr := make([]string, 0, len(h)) + for q, hash := range h { + hStr = append(hStr, fmt.Sprintf("%d: %x", q, hash)) + } + log.Info("Successfully dispatched minibatch", "operatorStateHash", hStr) + + b.MinibatchIndex++ + + return nil +} + +func (b *Minibatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, batchID uuid.UUID, minibatchIndex uint) { + for id, op := range state.IndexedOperators { + opInfo := op + opID := id + req := &DispersalRequest{ + BatchID: batchID, + MinibatchIndex: minibatchIndex, + OperatorID: opID, + Socket: op.Socket, + NumBlobs: uint(len(blobs)), + RequestedAt: time.Now().UTC(), + } + err := b.MinibatchStore.PutDispersalRequest(ctx, req) + if err != nil { + b.logger.Error("failed to put dispersal request", "err", err) + continue + } + b.Pool.Submit(func() { + signatures, err := b.SendBlobsToOperatorWithRetries(ctx, blobs, batchHeader, opInfo, opID, int(b.MaxNumRetriesPerDispersal)) + if err != nil { + b.logger.Errorf("failed to send blobs to operator %s: %v", opID.Hex(), err) + } + // Update the minibatch state + err = b.MinibatchStore.PutDispersalResponse(ctx, &DispersalResponse{ + DispersalRequest: *req, + Signatures: signatures, + RespondedAt: time.Now().UTC(), + Error: err, + }) + if err != nil { + b.logger.Error("failed to put dispersal response", "err", err) + } + }) + } +} + +func (b *Minibatcher) SendBlobsToOperatorWithRetries( + ctx context.Context, + blobs []core.EncodedBlob, + batchHeader *core.BatchHeader, + op *core.IndexedOperatorInfo, + opID core.OperatorID, + maxNumRetries int, +) ([]*core.Signature, error) { + blobMessages := make([]*core.BlobMessage, 0) + hasAnyBundles := false + for _, blob := range blobs { + if _, ok := blob.BundlesByOperator[opID]; ok { + hasAnyBundles = true + } + blobMessages = append(blobMessages, &core.BlobMessage{ + BlobHeader: blob.BlobHeader, + // Bundles will be empty if the operator is not in the quorums blob is dispersed on + Bundles: blob.BundlesByOperator[opID], + }) + } + if !hasAnyBundles { + // Operator is not part of any quorum, no need to send chunks + return nil, fmt.Errorf("operator %s is not part of any quorum", opID.Hex()) + } + + numRetries := 0 + // initially set the timeout equal to the pull interval with exponential backoff + timeout := b.PullInterval + var signatures []*core.Signature + var err error + for numRetries < maxNumRetries { + requestedAt := time.Now() + ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + signatures, err = b.Dispatcher.SendBlobsToOperator(ctxWithTimeout, blobMessages, batchHeader, op) + latencyMs := float64(time.Since(requestedAt).Milliseconds()) + if err != nil { + b.logger.Error("error sending chunks to operator", "operator", opID.Hex(), "err", err, "timeout", timeout.String(), "numRetries", numRetries, "maxNumRetries", maxNumRetries) + numRetries++ + timeout *= 2 + continue + } + b.logger.Info("sent chunks to operator", "operator", opID.Hex(), "latencyMs", latencyMs) + break + } + + if signatures == nil && err != nil { + return nil, fmt.Errorf("failed to send chunks to operator %s: %w", opID.Hex(), err) + } + + return signatures, nil +} diff --git a/disperser/batcher/minibatcher_test.go b/disperser/batcher/minibatcher_test.go new file mode 100644 index 0000000000..400b5120ca --- /dev/null +++ b/disperser/batcher/minibatcher_test.go @@ -0,0 +1,357 @@ +package batcher_test + +import ( + "context" + "errors" + "math/big" + "testing" + "time" + + cmock "github.com/Layr-Labs/eigenda/common/mock" + commonmock "github.com/Layr-Labs/eigenda/common/mock" + "github.com/Layr-Labs/eigenda/core" + coremock "github.com/Layr-Labs/eigenda/core/mock" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/disperser/batcher" + "github.com/Layr-Labs/eigenda/disperser/batcher/inmem" + dinmem "github.com/Layr-Labs/eigenda/disperser/common/inmem" + dmock "github.com/Layr-Labs/eigenda/disperser/mock" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/consensys/gnark-crypto/ecc/bn254" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/gammazero/workerpool" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +var ( + opId0, _ = core.OperatorIDFromHex("e22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311") + opId1, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568312") + mockChainState, _ = coremock.NewChainDataMock(map[uint8]map[core.OperatorID]int{ + 0: { + opId0: 1, + opId1: 1, + }, + 1: { + opId0: 1, + }, + }) + defaultConfig = batcher.MinibatcherConfig{ + PullInterval: 1 * time.Second, + MaxNumConnections: 3, + MaxNumRetriesPerDispersal: 3, + } + initialBlock = uint(10) +) + +type minibatcherComponents struct { + minibatcher *batcher.Minibatcher + blobStore disperser.BlobStore + minibatchStore batcher.MinibatchStore + dispatcher *dmock.Dispatcher + chainState *coremock.MockIndexedChainState + assignmentCoordinator core.AssignmentCoordinator + encodingStreamer *batcher.EncodingStreamer + pool *workerpool.WorkerPool + ethClient *commonmock.MockEthClient + logger logging.Logger +} + +func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcherComponents { + logger := logging.NewNoopLogger() + blobStore := dinmem.NewBlobStore() + minibatchStore := inmem.NewMinibatchStore(logger) + chainState, err := coremock.NewChainDataMock(mockChainState.Stakes) + assert.NoError(t, err) + state := chainState.GetTotalOperatorState(context.Background(), 0) + dispatcher := dmock.NewDispatcher(state) + ics := &coremock.MockIndexedChainState{} + streamerConfig := batcher.StreamerConfig{ + SRSOrder: 3000, + EncodingRequestTimeout: 5 * time.Second, + EncodingQueueLimit: 10, + TargetNumChunks: 8092, + MaxBlobsToFetchFromStore: 10, + FinalizationBlockDelay: 0, + ChainStateTimeout: 5 * time.Second, + } + encodingWorkerPool := workerpool.New(10) + p, err := makeTestProver() + assert.NoError(t, err) + encoderClient := disperser.NewLocalEncoderClient(p) + asgn := &core.StdAssignmentCoordinator{} + chainState.On("GetCurrentBlockNumber").Return(initialBlock, nil) + metrics := batcher.NewMetrics("9100", logger) + trigger := batcher.NewEncodedSizeNotifier( + make(chan struct{}, 1), + 10*1024*1024, + ) + encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, chainState, encoderClient, asgn, trigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger) + assert.NoError(t, err) + ethClient := &cmock.MockEthClient{} + pool := workerpool.New(int(config.MaxNumConnections)) + m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, chainState, asgn, encodingStreamer, ethClient, pool, logger) + assert.NoError(t, err) + return &minibatcherComponents{ + minibatcher: m, + blobStore: blobStore, + minibatchStore: minibatchStore, + dispatcher: dispatcher, + chainState: ics, + assignmentCoordinator: asgn, + encodingStreamer: encodingStreamer, + pool: pool, + ethClient: ethClient, + logger: logger, + } +} + +func TestDisperseMinibatch(t *testing.T) { + c := newMinibatcher(t, defaultConfig) + var X, Y fp.Element + X = *X.SetBigInt(big.NewInt(1)) + Y = *Y.SetBigInt(big.NewInt(2)) + c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*core.Signature{ + { + G1Point: &core.G1Point{ + G1Affine: &bn254.G1Affine{ + X: X, + Y: Y, + }, + }, + }, + }, nil) + ctx := context.Background() + + blob1 := makeTestBlob([]*core.SecurityParam{{ + QuorumID: 0, + AdversaryThreshold: 80, + ConfirmationThreshold: 100, + }}) + blob2 := makeTestBlob([]*core.SecurityParam{{ + QuorumID: 1, + AdversaryThreshold: 70, + ConfirmationThreshold: 100, + }}) + _, _ = queueBlob(t, ctx, &blob1, c.blobStore) + _, _ = queueBlob(t, ctx, &blob2, c.blobStore) + + // Start the batcher + out := make(chan batcher.EncodingResultOrStatus) + err := c.encodingStreamer.RequestEncoding(ctx, out) + assert.NoError(t, err) + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + assert.NoError(t, err) + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + assert.NoError(t, err) + count, _ := c.encodingStreamer.EncodedBlobstore.GetEncodedResultSize() + assert.Equal(t, 2, count) + + err = c.minibatcher.HandleSingleBatch(ctx) + assert.NoError(t, err) + assert.NotNil(t, c.minibatcher.BatchID) + assert.Equal(t, c.minibatcher.MinibatchIndex, uint(1)) + assert.Equal(t, c.minibatcher.ReferenceBlockNumber, initialBlock) + + b, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.BatchID) + assert.NoError(t, err) + assert.NotNil(t, b) + assert.Equal(t, c.minibatcher.BatchID, b.ID) + assert.NotNil(t, b.HeaderHash) + assert.NotNil(t, b.CreatedAt) + assert.Equal(t, c.minibatcher.ReferenceBlockNumber, b.ReferenceBlockNumber) + mb, err := c.minibatchStore.GetMinibatch(ctx, c.minibatcher.BatchID, 0) + assert.NoError(t, err) + assert.NotNil(t, mb) + assert.Equal(t, c.minibatcher.BatchID, mb.BatchID) + assert.Equal(t, uint(0), mb.MinibatchIndex) + assert.Len(t, mb.BlobHeaderHashes, 2) + assert.Equal(t, uint64(12800), mb.BatchSize) + assert.Equal(t, c.minibatcher.ReferenceBlockNumber, mb.ReferenceBlockNumber) + + c.pool.StopWait() + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2) + dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, c.minibatcher.BatchID, 0) + assert.NoError(t, err) + assert.Len(t, dispersalRequests, 2) + opIDs := make([]core.OperatorID, 2) + for i, req := range dispersalRequests { + assert.Equal(t, req.BatchID, c.minibatcher.BatchID) + assert.Equal(t, req.MinibatchIndex, uint(0)) + assert.Equal(t, req.NumBlobs, uint(2)) + assert.NotNil(t, req.Socket) + assert.NotNil(t, req.RequestedAt) + opIDs[i] = req.OperatorID + } + assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1}) + + dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, c.minibatcher.BatchID, 0) + assert.NoError(t, err) + assert.Len(t, dispersalResponses, 2) + for _, resp := range dispersalResponses { + assert.Equal(t, resp.BatchID, c.minibatcher.BatchID) + assert.Equal(t, resp.MinibatchIndex, uint(0)) + assert.NotNil(t, resp.RespondedAt) + assert.NoError(t, resp.Error) + assert.Len(t, resp.Signatures, 1) + } +} + +func TestDisperseMinibatchFailure(t *testing.T) { + c := newMinibatcher(t, defaultConfig) + var X, Y fp.Element + X = *X.SetBigInt(big.NewInt(1)) + Y = *Y.SetBigInt(big.NewInt(2)) + c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*core.Signature{ + { + G1Point: &core.G1Point{ + G1Affine: &bn254.G1Affine{ + X: X, + Y: Y, + }, + }, + }, + }, nil) + ctx := context.Background() + + blob1 := makeTestBlob([]*core.SecurityParam{{ + QuorumID: 0, + AdversaryThreshold: 80, + ConfirmationThreshold: 100, + }}) + blob2 := makeTestBlob([]*core.SecurityParam{{ + QuorumID: 1, + AdversaryThreshold: 70, + ConfirmationThreshold: 100, + }}) + _, _ = queueBlob(t, ctx, &blob1, c.blobStore) + _, _ = queueBlob(t, ctx, &blob2, c.blobStore) + + // Start the batcher + out := make(chan batcher.EncodingResultOrStatus) + err := c.encodingStreamer.RequestEncoding(ctx, out) + assert.NoError(t, err) + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + assert.NoError(t, err) + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + assert.NoError(t, err) + count, _ := c.encodingStreamer.EncodedBlobstore.GetEncodedResultSize() + assert.Equal(t, 2, count) + + err = c.minibatcher.HandleSingleBatch(ctx) + assert.NoError(t, err) + assert.NotNil(t, c.minibatcher.BatchID) + assert.Equal(t, c.minibatcher.MinibatchIndex, uint(1)) + assert.Equal(t, c.minibatcher.ReferenceBlockNumber, initialBlock) + + b, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.BatchID) + assert.NoError(t, err) + assert.NotNil(t, b) + assert.Equal(t, c.minibatcher.BatchID, b.ID) + assert.NotNil(t, b.HeaderHash) + assert.NotNil(t, b.CreatedAt) + assert.Equal(t, c.minibatcher.ReferenceBlockNumber, b.ReferenceBlockNumber) + mb, err := c.minibatchStore.GetMinibatch(ctx, c.minibatcher.BatchID, 0) + assert.NoError(t, err) + assert.NotNil(t, mb) + assert.Equal(t, c.minibatcher.BatchID, mb.BatchID) + assert.Equal(t, uint(0), mb.MinibatchIndex) + assert.Len(t, mb.BlobHeaderHashes, 2) + assert.Equal(t, uint64(12800), mb.BatchSize) + assert.Equal(t, c.minibatcher.ReferenceBlockNumber, mb.ReferenceBlockNumber) + + c.pool.StopWait() + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2) + dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, c.minibatcher.BatchID, 0) + assert.NoError(t, err) + assert.Len(t, dispersalRequests, 2) + opIDs := make([]core.OperatorID, 2) + for i, req := range dispersalRequests { + assert.Equal(t, req.BatchID, c.minibatcher.BatchID) + assert.Equal(t, req.MinibatchIndex, uint(0)) + assert.Equal(t, req.NumBlobs, uint(2)) + assert.NotNil(t, req.Socket) + assert.NotNil(t, req.RequestedAt) + opIDs[i] = req.OperatorID + } + assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1}) + + dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, c.minibatcher.BatchID, 0) + assert.NoError(t, err) + assert.Len(t, dispersalResponses, 2) + for _, resp := range dispersalResponses { + assert.Equal(t, resp.BatchID, c.minibatcher.BatchID) + assert.Equal(t, resp.MinibatchIndex, uint(0)) + assert.NotNil(t, resp.RespondedAt) + assert.NoError(t, resp.Error) + assert.Len(t, resp.Signatures, 1) + } +} + +func TestSendBlobsToOperatorWithRetries(t *testing.T) { + c := newMinibatcher(t, defaultConfig) + var X, Y fp.Element + X = *X.SetBigInt(big.NewInt(1)) + Y = *Y.SetBigInt(big.NewInt(2)) + sig := &core.Signature{ + G1Point: &core.G1Point{ + G1Affine: &bn254.G1Affine{ + X: X, + Y: Y, + }, + }, + } + ctx := context.Background() + + blob1 := makeTestBlob([]*core.SecurityParam{{ + QuorumID: 0, + AdversaryThreshold: 80, + ConfirmationThreshold: 100, + }}) + blob2 := makeTestBlob([]*core.SecurityParam{{ + QuorumID: 1, + AdversaryThreshold: 70, + ConfirmationThreshold: 100, + }}) + _, _ = queueBlob(t, ctx, &blob1, c.blobStore) + _, _ = queueBlob(t, ctx, &blob2, c.blobStore) + + // Start the batcher + out := make(chan batcher.EncodingResultOrStatus) + err := c.encodingStreamer.RequestEncoding(ctx, out) + assert.NoError(t, err) + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + assert.NoError(t, err) + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + assert.NoError(t, err) + count, _ := c.encodingStreamer.EncodedBlobstore.GetEncodedResultSize() + assert.Equal(t, 2, count) + batch, err := c.encodingStreamer.CreateMinibatch(ctx) + assert.NoError(t, err) + + c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fail")).Twice() + c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*core.Signature{sig}, nil).Once() + signatures, err := c.minibatcher.SendBlobsToOperatorWithRetries(ctx, batch.EncodedBlobs, batch.BatchHeader, batch.State.IndexedOperators[opId0], opId0, 3) + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 3) + assert.NoError(t, err) + assert.Len(t, signatures, 1) + + c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fail")).Times(3) + c.dispatcher.On("SendBlobsToOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*core.Signature{sig}, nil).Once() + signatures, err = c.minibatcher.SendBlobsToOperatorWithRetries(ctx, batch.EncodedBlobs, batch.BatchHeader, batch.State.IndexedOperators[opId1], opId1, 3) + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 6) + assert.ErrorContains(t, err, "failed to send chunks to operator") + assert.Nil(t, signatures) +} + +func TestMinibatcherTooManyPendingRequests(t *testing.T) { + c := newMinibatcher(t, defaultConfig) + ctx := context.Background() + mockWorkerPool := &cmock.MockWorkerpool{} + // minibatcher with mock worker pool + m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.minibatcher.ChainState, c.assignmentCoordinator, c.encodingStreamer, c.ethClient, mockWorkerPool, c.logger) + assert.NoError(t, err) + mockWorkerPool.On("WaitingQueueSize").Return(int(defaultConfig.MaxNumConnections + 1)).Once() + err = m.HandleSingleBatch(ctx) + assert.ErrorContains(t, err, "too many pending requests") +} diff --git a/disperser/cmd/apiserver/config.go b/disperser/cmd/apiserver/config.go index dffb7afc21..03d592aac8 100644 --- a/disperser/cmd/apiserver/config.go +++ b/disperser/cmd/apiserver/config.go @@ -49,9 +49,8 @@ func NewConfig(ctx *cli.Context) (Config, error) { config := Config{ AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix), ServerConfig: disperser.ServerConfig{ - GrpcPort: ctx.GlobalString(flags.GrpcPortFlag.Name), - GrpcTimeout: ctx.GlobalDuration(flags.GrpcTimeoutFlag.Name), - EnableDualQuorums: ctx.GlobalBool(flags.EnableDualQuorums.Name), + GrpcPort: ctx.GlobalString(flags.GrpcPortFlag.Name), + GrpcTimeout: ctx.GlobalDuration(flags.GrpcTimeoutFlag.Name), }, BlobstoreConfig: blobstore.Config{ BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name), diff --git a/disperser/cmd/apiserver/flags/flags.go b/disperser/cmd/apiserver/flags/flags.go index dc13c1155c..ebc28b5f0d 100644 --- a/disperser/cmd/apiserver/flags/flags.go +++ b/disperser/cmd/apiserver/flags/flags.go @@ -87,12 +87,6 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "RATE_BUCKET_STORE_SIZE"), Required: false, } - EnableDualQuorums = cli.BoolTFlag{ - Name: common.PrefixFlag(FlagPrefix, "enable-dual-quorums"), - Usage: "Whether to enable dual quorum staking. If false, only quorum 0 is used as required quorum", - Required: false, - EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_DUAL_QUORUMS"), - } ) var requiredFlags = []cli.Flag{ @@ -110,7 +104,6 @@ var optionalFlags = []cli.Flag{ EnableRatelimiter, BucketStoreSize, GrpcTimeoutFlag, - EnableDualQuorums, } // Flags contains the list of configuration options available to the binary. diff --git a/disperser/cmd/batcher/config.go b/disperser/cmd/batcher/config.go index 7918a5e9f0..9ad499aa40 100644 --- a/disperser/cmd/batcher/config.go +++ b/disperser/cmd/batcher/config.go @@ -33,6 +33,8 @@ type Config struct { EigenDAServiceManagerAddr string EnableMinibatch bool + + EnableGnarkBundleEncoding bool } func NewConfig(ctx *cli.Context) (Config, error) { @@ -88,6 +90,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { IndexerConfig: indexer.ReadIndexerConfig(ctx), KMSKeyConfig: kmsConfig, EnableMinibatch: ctx.Bool(flags.EnableMinibatchFlag.Name), + EnableGnarkBundleEncoding: ctx.Bool(flags.EnableGnarkBundleEncodingFlag.Name), } return config, nil } diff --git a/disperser/cmd/batcher/flags/flags.go b/disperser/cmd/batcher/flags/flags.go index 6dddb76cb0..91c5f5ea6f 100644 --- a/disperser/cmd/batcher/flags/flags.go +++ b/disperser/cmd/batcher/flags/flags.go @@ -193,6 +193,12 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "FINALIZATION_BLOCK_DELAY"), Value: 75, } + EnableGnarkBundleEncodingFlag = cli.BoolFlag{ + Name: common.PrefixFlag(FlagPrefix, "enable-gnark-bundle-encoding"), + Usage: "Enable Gnark bundle encoding for chunks", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_GNARK_BUNDLE_ENCODING"), + } // EnableMinibatchFlag is a flag to enable minibatch processing // Defaults to false EnableMinibatchFlag = cli.BoolFlag{ @@ -201,6 +207,27 @@ var ( Required: false, EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_MINIBATCH"), } + MinibatcherPullIntervalFlag = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "minibatcher-pull-interval"), + Usage: "Interval at which to pull from the queue and disperse a minibatch. Only used when minibatching is enabled. Defaults to 5s.", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "MINIBATCHER_PULL_INTERVAL"), + Value: 5 * time.Second, + } + MaxNodeConnectionsFlag = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "max-node-connections"), + Usage: "Maximum number of connections to the node. Only used when minibatching is enabled. Defaults to 1024.", + Required: false, + Value: 1024, + EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_NODE_CONNECTIONS"), + } + MaxNumRetriesPerDispersalFlag = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "max-num-retries-per-dispersal"), + Usage: "Maximum number of retries to disperse a minibatch. Only used when minibatching is enabled. Defaults to 3.", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_NUM_RETRIES_PER_DISPERSAL"), + Value: 3, + } ) var requiredFlags = []cli.Flag{ @@ -234,6 +261,10 @@ var optionalFlags = []cli.Flag{ MaxBlobsToFetchFromStoreFlag, FinalizationBlockDelayFlag, EnableMinibatchFlag, + MinibatcherPullIntervalFlag, + MaxNodeConnectionsFlag, + MaxNumRetriesPerDispersalFlag, + EnableGnarkBundleEncodingFlag, } // Flags contains the list of configuration options available to the binary. diff --git a/disperser/cmd/batcher/main.go b/disperser/cmd/batcher/main.go index 49868b4a3c..3735940fe3 100644 --- a/disperser/cmd/batcher/main.go +++ b/disperser/cmd/batcher/main.go @@ -99,7 +99,8 @@ func RunBatcher(ctx *cli.Context) error { metrics := batcher.NewMetrics(config.MetricsConfig.HTTPPort, logger) dispatcher := dispatcher.NewDispatcher(&dispatcher.Config{ - Timeout: config.TimeoutConfig.AttestationTimeout, + Timeout: config.TimeoutConfig.AttestationTimeout, + EnableGnarkBundleEncoding: config.EnableGnarkBundleEncoding, }, logger, metrics.DispatcherMetrics) asgn := &core.StdAssignmentCoordinator{} diff --git a/disperser/disperser.go b/disperser/disperser.go index f9ea58782c..8603b4abfa 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -13,6 +13,7 @@ import ( "github.com/Layr-Labs/eigenda/encoding" disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" + "github.com/Layr-Labs/eigenda/api/grpc/node" gcommon "github.com/ethereum/go-ethereum/common" ) @@ -178,6 +179,8 @@ type BlobStore interface { type Dispatcher interface { DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage SendBlobsToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) + AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) + SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) } // GenerateReverseIndexKey returns the key used to store the blob key in the reverse index diff --git a/disperser/mock/dispatcher.go b/disperser/mock/dispatcher.go index 8569e9779b..743de8ebd9 100644 --- a/disperser/mock/dispatcher.go +++ b/disperser/mock/dispatcher.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "github.com/Layr-Labs/eigenda/api/grpc/node" "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/disperser" @@ -72,3 +73,19 @@ func (d *Dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.Blob } return args.Get(0).([]*core.Signature), args.Error(1) } + +func (d *Dispatcher) AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) { + args := d.Called(ctx, state, blobHeaderHashes, batchHeader) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(chan core.SigningMessage), args.Error(1) +} + +func (d *Dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { + args := d.Called(ctx, nodeDispersalClient, blobHeaderHashes, batchHeader, op) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*core.Signature), args.Error(1) +} diff --git a/disperser/server_config.go b/disperser/server_config.go index 2c0b94a607..b094fca2a6 100644 --- a/disperser/server_config.go +++ b/disperser/server_config.go @@ -9,9 +9,4 @@ const ( type ServerConfig struct { GrpcPort string GrpcTimeout time.Duration - - // Feature flags - // Whether enable the dual quorums. - // If false, only quorum 0 will be used as required quorum. - EnableDualQuorums bool } diff --git a/docs/contributing.md b/docs/contributing.md index 8dda9618e3..82f69cb693 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -11,13 +11,12 @@ In general, the `core` project contains implementation of all the important busi # Directory structure
-┌── api Protobuf definitions and contract bindings
-├── clients: Client-side libraries for users to integrate with EigenDA
+┌── api Protobuf definitions, contract bindings and client-side libraries for users to integrate with EigenDA
 ├── common: Common utility libraries
 ├── contracts
 |   ├── eigenlayer-contracts: Contracts for the EigenLayer restaking platform
 ┌── core: Core logic of the EigenDA protocol
-├── disperser: Disperser service
+├── disperser: Disperser service including API server, encoder and batcher
 ├── docs: Documentation and specification
 ├── encoding: Encoding libraries such as Reed-Solomon, KZG
 ├── inabox: Inabox test to run EigenDA system on a single machine
diff --git a/docs/spec/attestation/amortized-proving.md b/docs/spec/attestation/amortized-proving.md
index 6d919d264c..bd5654ecd4 100644
--- a/docs/spec/attestation/amortized-proving.md
+++ b/docs/spec/attestation/amortized-proving.md
@@ -54,7 +54,7 @@ For the purposes of the KZG-FFT encoder, this means that we must choose $S$ to b
 
 ## Worked Example
 
-As a simple illustrative example, suppose that  `AssignmentCoordinator` provides the following parameters in order to meet the security requirements of given blob:
+As a simple illustrative example, suppose that  `AssignmentCoordinator` provides the following parameters in order to meet the security requirements of a given blob:
 - `ChunkLength` = 3
 - `NumChunks` = 4
 
diff --git a/docs/spec/old/indexer.md b/docs/spec/old/indexer.md
index b4b1c61dc3..37cd568161 100644
--- a/docs/spec/old/indexer.md
+++ b/docs/spec/old/indexer.md
@@ -173,7 +173,7 @@ type AccumulatorObject interface{
 
 type Accumulator interface{
 
-    // IndexHeaders accepts a list of incoming headers. Will throw an error is the accumulator does not have an existing header which can form a chain with the incoming headers. The Accumulator will discard any orphaned headers. 
+    // IndexHeaders accepts a list of incoming headers. Will throw an error if the accumulator does not have an existing header which can form a chain with the incoming headers. The Accumulator will discard any orphaned headers. 
     ProcessHeaders(headers []Headers) error
 
     // GetAccumulator accepts a header and returns the value of the accumulator at that header. Either the Number or BlockHash fields of the header can be used. 
diff --git a/docs/spec/overview.md b/docs/spec/overview.md
index 6e07a6e244..a4b47eab32 100644
--- a/docs/spec/overview.md
+++ b/docs/spec/overview.md
@@ -23,7 +23,7 @@ Because of this, EigenDA makes use of the EigenLayer state, which is stored on E
 
 ### A first of its kind, horizontally scalable DA solution
 
-Among extant DA solutions, EigenDA takes an approach to scalability which is unique in that it yields true horizontal scalability: Every additional unit of capacity contributed by a operator can increase the total system capacity. 
+Among extant DA solutions, EigenDA takes an approach to scalability which is unique in that it yields true horizontal scalability: Every additional unit of capacity contributed by an operator can increase the total system capacity. 
 
 This property is achieved by using a Reed Solomon erasure encoding scheme to shard the blob data across the DA nodes. While other systems such as Celestia and Danksharding (planned) also make use of Reed Solomon encoding, they do so only for the purpose of supporting certain observability properties of Data Availability Sampling (DAS) by light nodes. On the other hand, all incentivized/full nodes of the system download, store, and serve the full system bandwidth. 
 
@@ -44,7 +44,7 @@ EigenDA defines two properties of each blob attestation which relate to its live
 
 The term "first-order attack" alludes to the fact that exceeding the safety threshold may represent only a contingency rather than an actual safety failure due to the presence of recovery mechanisms that would apply during such a contingency. Discussion of such mechanisms is outside of the scope of the current documentation. 
 
-Safety thresholds can translate directly into cryptoeconomic safety properties for quorums consisting of tokens which experience toxicity in the event of publicly observable attacks by a large coalition of token holders. This an other discussions of cryptoeconomic security are also beyond the scope of this technical documentation. We restrict the discussion to illustrating how the protocol preserves the given safety and liveness thresholds. 
+Safety thresholds can translate directly into cryptoeconomic safety properties for quorums consisting of tokens which experience toxicity in the event of publicly observable attacks by a large coalition of token holders. This and other discussions of cryptoeconomic security are also beyond the scope of this technical documentation. We restrict the discussion to illustrating how the protocol preserves the given safety and liveness thresholds. 
 
 ## System Architecture
 
diff --git a/node/grpc/server.go b/node/grpc/server.go
index 977d1e5889..8287addddd 100644
--- a/node/grpc/server.go
+++ b/node/grpc/server.go
@@ -2,6 +2,7 @@ package grpc
 
 import (
 	"context"
+	"encoding/hex"
 	"errors"
 	"fmt"
 	"runtime"
@@ -302,10 +303,10 @@ func (s *Server) RetrieveChunks(ctx context.Context, in *pb.RetrieveChunksReques
 		return nil, errors.New("request rate limited")
 	}
 
-	chunks, format, ok := s.node.Store.GetChunks(ctx, batchHeaderHash, int(in.GetBlobIndex()), uint8(in.GetQuorumId()))
-	if !ok {
+	chunks, format, err := s.node.Store.GetChunks(ctx, batchHeaderHash, int(in.GetBlobIndex()), uint8(in.GetQuorumId()))
+	if err != nil {
 		s.node.Metrics.RecordRPCRequest("RetrieveChunks", "failure", time.Since(start))
-		return nil, fmt.Errorf("could not find chunks for batchHeaderHash %v, blob index: %v, quorumID: %v", batchHeaderHash, in.GetBlobIndex(), in.GetQuorumId())
+		return nil, fmt.Errorf("could not find chunks for batchHeaderHash %v, blob index: %v, quorumID: %v", hex.EncodeToString(batchHeaderHash[:]), in.GetBlobIndex(), in.GetQuorumId())
 	}
 	if !s.config.EnableGnarkBundleEncoding && format == pb.ChunkEncoding_GNARK {
 		format = pb.ChunkEncoding_GOB
diff --git a/node/grpc/server_load_test.go b/node/grpc/server_load_test.go
index 88a5a8256b..a0ab6bd72d 100644
--- a/node/grpc/server_load_test.go
+++ b/node/grpc/server_load_test.go
@@ -103,7 +103,7 @@ func TestStoreChunks(t *testing.T) {
 		numTotalChunks += len(blobMessagesByOp[opID][i].Bundles[0])
 	}
 	t.Logf("Batch numTotalChunks: %d", numTotalChunks)
-	req, totalSize, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader)
+	req, totalSize, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader, false)
 	fmt.Println("totalSize", totalSize)
 	assert.NoError(t, err)
 	assert.Equal(t, int64(26214400), totalSize)
diff --git a/node/store.go b/node/store.go
index 0e38be43d1..6b35e6270e 100644
--- a/node/store.go
+++ b/node/store.go
@@ -347,26 +347,26 @@ func (s *Store) GetBlobHeader(ctx context.Context, batchHeaderHash [32]byte, blo
 	return data, nil
 }
 
-// GetChunks returns the list of byte arrays stored for given blobKey along with a boolean
-// indicating if the read was unsuccessful or the chunks were serialized correctly
-func (s *Store) GetChunks(ctx context.Context, batchHeaderHash [32]byte, blobIndex int, quorumID core.QuorumID) ([][]byte, node.ChunkEncoding, bool) {
+// GetChunks returns the list of byte arrays stored for given blobKey along with the encoding
+// format of the bytes.
+func (s *Store) GetChunks(ctx context.Context, batchHeaderHash [32]byte, blobIndex int, quorumID core.QuorumID) ([][]byte, node.ChunkEncoding, error) {
 	log := s.logger
 
 	blobKey, err := EncodeBlobKey(batchHeaderHash, blobIndex, quorumID)
 	if err != nil {
-		return nil, node.ChunkEncoding_UNKNOWN, false
+		return nil, node.ChunkEncoding_UNKNOWN, err
 	}
 	data, err := s.db.Get(blobKey)
 	if err != nil {
-		return nil, node.ChunkEncoding_UNKNOWN, false
+		return nil, node.ChunkEncoding_UNKNOWN, err
 	}
 	log.Debug("Retrieved chunk", "blobKey", hexutil.Encode(blobKey), "length", len(data))
 
 	chunks, format, err := DecodeChunks(data)
 	if err != nil {
-		return nil, format, false
+		return nil, format, err
 	}
-	return chunks, format, true
+	return chunks, format, nil
 }
 
 // HasKey returns if a given key has been stored.
diff --git a/node/store_test.go b/node/store_test.go
index 7c6faf45ce..c4c2a8d389 100644
--- a/node/store_test.go
+++ b/node/store_test.go
@@ -331,12 +331,11 @@ func TestStoringBlob(t *testing.T) {
 
 func decodeChunks(t *testing.T, s *node.Store, batchHeaderHash [32]byte, blobIdx int, chunkEncoding pb.ChunkEncoding) []*encoding.Frame {
 	ctx := context.Background()
-	chunks, format, ok := s.GetChunks(ctx, batchHeaderHash, blobIdx, 0)
-	assert.True(t, ok)
+	chunks, format, err := s.GetChunks(ctx, batchHeaderHash, blobIdx, 0)
+	assert.Nil(t, err)
 	assert.Equal(t, 1, len(chunks))
 	assert.Equal(t, chunkEncoding, format)
 	var f *encoding.Frame
-	var err error
 	switch chunkEncoding {
 	case pb.ChunkEncoding_GOB:
 		f, err = new(encoding.Frame).Deserialize(chunks[0])