Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable minibatcher #701

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ type BlobHeader struct {

// AccountID is the account that is paying for the blob to be stored
AccountID AccountID

// Reference block number
ReferenceBlockNumber uint
}

func (b *BlobHeader) GetQuorumInfo(quorum QuorumID) *BlobQuorumInfo {
Expand Down
12 changes: 11 additions & 1 deletion disperser/batcher/batch_confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type BatchConfirmerConfig struct {
DispersalStatusCheckInterval time.Duration
AttestationTimeout time.Duration
SRSOrder int
NumConnections int
NumConnections uint
MaxNumRetriesPerBlob uint
}

Expand All @@ -46,6 +46,7 @@ type BatchConfirmer struct {

ethClient common.EthClient
logger logging.Logger
Metrics *Metrics
}

func NewBatchConfirmer(
Expand All @@ -62,6 +63,7 @@ func NewBatchConfirmer(
txnManager TxnManager,
minibatcher *Minibatcher,
logger logging.Logger,
metrics *Metrics,
) (*BatchConfirmer, error) {
return &BatchConfirmer{
BatchConfirmerConfig: config,
Expand All @@ -76,6 +78,7 @@ func NewBatchConfirmer(
Transactor: transactor,
TransactionManager: txnManager,
Minibatcher: minibatcher,
Metrics: metrics,

ethClient: ethClient,
logger: logger.With("component", "BatchConfirmer"),
Expand Down Expand Up @@ -285,6 +288,11 @@ func (b *BatchConfirmer) handleFailure(ctx context.Context, batchID uuid.UUID, b
continue
}

if reason == FailNoSignatures {
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.InsufficientSignatures)
} else {
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.Failed)
}
numPermanentFailures++
}

Expand Down Expand Up @@ -324,6 +332,7 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error {
}
}

b.logger.Info("Processing batch", "batchID", batch.ID)
// Make sure all minibatches in the batch have been dispersed
batchDispersed := false
stateUpdateTicker.Reset(b.DispersalStatusCheckInterval)
Expand All @@ -332,6 +341,7 @@ func (b *BatchConfirmer) HandleSingleBatch(ctx context.Context) error {
for !batchDispersed {
select {
case <-ctxWithTimeout.Done():
b.logger.Error("timed out waiting for batch to disperse")
return ctxWithTimeout.Err()
case <-stateUpdateTicker.C:
batchDispersed, err = b.MinibatchStore.BatchDispersed(ctx, batch.ID, batch.NumMinibatches)
Expand Down
7 changes: 3 additions & 4 deletions disperser/batcher/batch_confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
bat "github.com/Layr-Labs/eigenda/disperser/batcher"
batcherinmem "github.com/Layr-Labs/eigenda/disperser/batcher/inmem"
batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock"
batmock "github.com/Layr-Labs/eigenda/disperser/batcher/mock"
"github.com/Layr-Labs/eigenda/disperser/common/inmem"
dmock "github.com/Layr-Labs/eigenda/disperser/mock"
"github.com/Layr-Labs/eigenda/encoding"
Expand Down Expand Up @@ -57,7 +56,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents {
dispatcher := dmock.NewDispatcher(state)
blobStore := inmem.NewBlobStore()
ethClient := &cmock.MockEthClient{}
txnManager := batmock.NewTxnManager()
txnManager := batchermock.NewTxnManager()
minibatchStore := batcherinmem.NewMinibatchStore(logger)
encodingWorkerPool := workerpool.New(10)
encoderProver, err = makeTestProver()
Expand All @@ -76,7 +75,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents {
MaxNumConnections: 10,
MaxNumRetriesPerBlob: 2,
MaxNumRetriesPerDispersal: 1,
}, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, ethClient, pool, logger)
}, blobStore, minibatchStore, dispatcher, mockChainState, encodingStreamer, pool, logger, metrics)
assert.NoError(t, err)

config := bat.BatchConfirmerConfig{
Expand All @@ -88,7 +87,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents {
SRSOrder: 3000,
MaxNumRetriesPerBlob: 2,
}
b, err := bat.NewBatchConfirmer(config, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, agg, ethClient, transactor, txnManager, minibatcher, logger)
b, err := bat.NewBatchConfirmer(config, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, agg, ethClient, transactor, txnManager, minibatcher, logger, metrics)
assert.NoError(t, err)

ethClient.On("BlockNumber").Return(uint64(initialBlock), nil)
Expand Down
10 changes: 10 additions & 0 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type QuorumInfo struct {

type TimeoutConfig struct {
EncodingTimeout time.Duration
DispersalTimeout time.Duration
AttestationTimeout time.Duration
ChainReadTimeout time.Duration
ChainWriteTimeout time.Duration
Expand All @@ -64,6 +65,11 @@ type Config struct {

TargetNumChunks uint
MaxBlobsToFetchFromStore int

EnableMinibatch bool
pschork marked this conversation as resolved.
Show resolved Hide resolved
BatchstoreTableName string
MinibatcherConfig MinibatcherConfig
DispersalStatusCheckInterval time.Duration
}

type Batcher struct {
Expand Down Expand Up @@ -105,6 +111,10 @@ func NewBatcher(
metrics *Metrics,
heartbeatChan chan time.Time,
) (*Batcher, error) {
if config.EnableMinibatch {
return nil, errors.New("minibatch is not supported")
}

batchTrigger := NewEncodedSizeNotifier(
make(chan struct{}, 1),
uint64(config.BatchSizeMBLimit)*1024*1024, // convert to bytes
Expand Down
2 changes: 0 additions & 2 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type batcherComponents struct {
transactor *coremock.MockTransactor
txnManager *batchermock.MockTxnManager
blobStore disperser.BlobStore
encoderClient *disperser.LocalEncoderClient
encodingStreamer *bat.EncodingStreamer
ethClient *cmock.MockEthClient
dispatcher *dmock.Dispatcher
Expand Down Expand Up @@ -153,7 +152,6 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.
transactor: transactor,
txnManager: txnManager,
blobStore: blobStore,
encoderClient: encoderClient,
encodingStreamer: b.EncodingStreamer,
ethClient: ethClient,
dispatcher: dispatcher,
Expand Down
32 changes: 31 additions & 1 deletion disperser/batcher/batchstore/minibatch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package batchstore

import (
"context"
"errors"
"fmt"
"strconv"
"time"
Expand Down Expand Up @@ -35,7 +36,7 @@ type MinibatchStore struct {

var _ batcher.MinibatchStore = (*MinibatchStore)(nil)

func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *MinibatchStore {
func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) batcher.MinibatchStore {
logger.Debugf("creating minibatch store with table %s with TTL: %s", tableName, ttl)
return &MinibatchStore{
dynamoDBClient: dynamoDBClient,
Expand Down Expand Up @@ -145,6 +146,9 @@ func MarshalDispersal(response *batcher.MinibatchDispersal) (map[string]types.At
if err != nil {
return nil, err
}
if response.Error != nil {
fields["Error"] = &types.AttributeValueMemberS{Value: response.DispersalResponse.Error.Error()}
}
fields["BatchID"] = &types.AttributeValueMemberS{Value: response.BatchID.String()}
fields["SK"] = &types.AttributeValueMemberS{Value: dispersalSKPrefix + fmt.Sprintf("%d#%s", response.MinibatchIndex, response.OperatorID.Hex())}
fields["OperatorID"] = &types.AttributeValueMemberS{Value: response.OperatorID.Hex()}
Expand All @@ -158,6 +162,10 @@ func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]t
if err != nil {
return nil, err
}
if response.Error != nil {
fields["Error"] = &types.AttributeValueMemberS{Value: response.Error.Error()}
}
fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())}
return fields, nil
}

Expand Down Expand Up @@ -192,6 +200,22 @@ func UnmarshalBatchID(item commondynamodb.Item) (*uuid.UUID, error) {
return &batchID, nil
}

func UnmarshalError(item commondynamodb.Item) (error, error) {
type Error struct {
Error string
}
var e Error
err := attributevalue.UnmarshalMap(item, &e)
if err != nil {
return nil, err
}

if e.Error == "" {
return nil, nil
}
return errors.New(e.Error), nil
}

func UnmarshalBlobKey(item commondynamodb.Item) (*disperser.BlobKey, error) {
blobKey := disperser.BlobKey{}
err := attributevalue.UnmarshalMap(item, &blobKey)
Expand Down Expand Up @@ -257,6 +281,12 @@ func UnmarshalDispersal(item commondynamodb.Item) (*batcher.MinibatchDispersal,
}
response.OperatorID = *operatorID

dispersalError, err := UnmarshalError(item)
if err != nil {
return nil, err
}
response.OperatorID = *operatorID
response.Error = dispersalError
response.RespondedAt = response.RespondedAt.UTC()
response.RequestedAt = response.RequestedAt.UTC()
return &response, nil
Expand Down
31 changes: 30 additions & 1 deletion disperser/batcher/batchstore/minibatch_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package batchstore_test

import (
"context"
"errors"
"fmt"
"os"
"testing"
Expand Down Expand Up @@ -35,7 +36,7 @@ var (
localStackPort = "4566"

dynamoClient *dynamodb.Client
minibatchStore *batchstore.MinibatchStore
minibatchStore batcher.MinibatchStore

UUID = uuid.New()
minibatchTableName = fmt.Sprintf("test-MinibatchStore-%v", UUID)
Expand Down Expand Up @@ -237,6 +238,34 @@ func TestPutDispersal(t *testing.T) {
assert.Equal(t, response, r)
}

func TestDispersalError(t *testing.T) {
ctx := context.Background()
id, err := uuid.NewV7()
assert.NoError(t, err)
ts := time.Now().Truncate(time.Second).UTC()
opID := core.OperatorID([32]byte{123})
response := &batcher.MinibatchDispersal{
BatchID: id,
MinibatchIndex: 0,
OperatorID: opID,
OperatorAddress: gcommon.HexToAddress("0x0"),
Socket: "socket",
NumBlobs: 1,
RequestedAt: ts,
DispersalResponse: batcher.DispersalResponse{
Signatures: nil,
RespondedAt: ts,
Error: errors.New("error"),
},
}
err = minibatchStore.PutDispersal(ctx, response)
assert.NoError(t, err)
r, err := minibatchStore.GetDispersal(ctx, response.BatchID, response.MinibatchIndex, opID)
assert.NoError(t, err)
assert.Equal(t, response, r)
assert.Equal(t, response.DispersalResponse.Error.Error(), r.DispersalResponse.Error.Error())
}

func TestDispersalStatus(t *testing.T) {
ctx := context.Background()
id, err := uuid.NewV7()
Expand Down
3 changes: 2 additions & 1 deletion disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,8 @@ func (e *EncodingStreamer) CreateMinibatch(ctx context.Context) (*batch, error)
metadataByKey[blobKey] = result.BlobMetadata
blobQuorums[blobKey] = make([]*core.BlobQuorumInfo, 0)
blobHeader := &core.BlobHeader{
BlobCommitments: *result.Commitment,
BlobCommitments: *result.Commitment,
ReferenceBlockNumber: e.ReferenceBlockNumber,
}
blobHeaderByKey[blobKey] = blobHeader
encodedBlobByKey[blobKey] = core.EncodedBlob{
Expand Down
11 changes: 6 additions & 5 deletions disperser/batcher/grpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,11 +409,12 @@ func getBlobMessage(blob *core.EncodedBlobMessage, useGnarkBundleEncoding bool)

return &node.Blob{
Header: &node.BlobHeader{
Commitment: commitData,
LengthCommitment: &lengthCommitData,
LengthProof: &lengthProofData,
Length: uint32(blob.BlobHeader.Length),
QuorumHeaders: quorumHeaders,
Commitment: commitData,
LengthCommitment: &lengthCommitData,
LengthProof: &lengthProofData,
Length: uint32(blob.BlobHeader.Length),
QuorumHeaders: quorumHeaders,
ReferenceBlockNumber: uint32(blob.BlobHeader.ReferenceBlockNumber),
},
Bundles: bundles,
}, nil
Expand Down
2 changes: 1 addition & 1 deletion disperser/batcher/minibatch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type DispersalResponse struct {
// The signatures are compressed using G1Affine.Bytes(), to be decompressed using G1Affine.SetBytes()
Signatures [][32]byte
RespondedAt time.Time
Error error
Error error `dynamodbav:"-"`
}

type MinibatchDispersal struct {
Expand Down
Loading
Loading