diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go index 5fe1bf3c79..c4f11b593c 100644 --- a/disperser/batcher/batchstore/minibatch_store.go +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -8,6 +8,7 @@ import ( commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/disperser/batcher" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/aws/aws-sdk-go-v2/aws" @@ -18,11 +19,13 @@ import ( ) const ( - batchStatusIndexName = "BatchStatusIndex" - batchSKPrefix = "BATCH#" - minibatchSKPrefix = "MINIBATCH#" - dispersalRequestSKPrefix = "DISPERSAL_REQUEST#" - dispersalResponseSKPrefix = "DISPERSAL_RESPONSE#" + batchStatusIndexName = "BatchStatusIndex" + blobMinibatchMappingIndexName = "BlobMinibatchMappingIndex" + batchSKPrefix = "BATCH#" + minibatchSKPrefix = "MINIBATCH#" + dispersalRequestSKPrefix = "DISPERSAL_REQUEST#" + dispersalResponseSKPrefix = "DISPERSAL_RESPONSE#" + blobMinibatchMappingSKPrefix = "BLOB_MINIBATCH_MAPPING#" ) type MinibatchStore struct { @@ -63,6 +66,10 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit AttributeName: aws.String("CreatedAt"), AttributeType: types.ScalarAttributeTypeN, }, + { + AttributeName: aws.String("BlobHash"), + AttributeType: types.ScalarAttributeTypeS, + }, }, KeySchema: []types.KeySchemaElement{ { @@ -96,6 +103,26 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit WriteCapacityUnits: aws.Int64(writeCapacityUnits), }, }, + { + IndexName: aws.String(blobMinibatchMappingIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("BlobHash"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("SK"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, }, ProvisionedThroughput: &types.ProvisionedThroughput{ ReadCapacityUnits: aws.Int64(readCapacityUnits), @@ -150,6 +177,18 @@ func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]t return fields, nil } +func MarshalBlobMinibatchMapping(blobMinibatchMapping *batcher.BlobMinibatchMapping) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*blobMinibatchMapping) + if err != nil { + return nil, err + } + fields["BatchID"] = &types.AttributeValueMemberS{Value: blobMinibatchMapping.BatchID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: blobMinibatchMappingSKPrefix + fmt.Sprintf("%s#%s#%d", blobMinibatchMapping.BlobKey.MetadataHash, blobMinibatchMapping.BatchID, blobMinibatchMapping.BlobIndex)} + fields["BlobHash"] = &types.AttributeValueMemberS{Value: blobMinibatchMapping.BlobKey.BlobHash} + fields["MetadataHash"] = &types.AttributeValueMemberS{Value: blobMinibatchMapping.BlobKey.MetadataHash} + return fields, nil +} + func UnmarshalBatchID(item commondynamodb.Item) (*uuid.UUID, error) { type BatchID struct { BatchID string @@ -169,6 +208,16 @@ func UnmarshalBatchID(item commondynamodb.Item) (*uuid.UUID, error) { return &batchID, nil } +func UnmarshalBlobKey(item commondynamodb.Item) (*disperser.BlobKey, error) { + blobKey := disperser.BlobKey{} + err := attributevalue.UnmarshalMap(item, &blobKey) + if err != nil { + return nil, err + } + + return &blobKey, nil +} + func UnmarshalOperatorID(item commondynamodb.Item) (*core.OperatorID, error) { type OperatorID struct { OperatorID string @@ -268,6 +317,28 @@ func UnmarshalDispersalResponse(item commondynamodb.Item) (*batcher.DispersalRes return &response, nil } +func UnmarshalBlobMinibatchMapping(item commondynamodb.Item) (*batcher.BlobMinibatchMapping, error) { + blobMinibatchMapping := batcher.BlobMinibatchMapping{} + err := attributevalue.UnmarshalMap(item, &blobMinibatchMapping) + if err != nil { + return nil, err + } + + batchID, err := UnmarshalBatchID(item) + if err != nil { + return nil, err + } + blobMinibatchMapping.BatchID = *batchID + + blobKey, err := UnmarshalBlobKey(item) + if err != nil { + return nil, err + } + blobMinibatchMapping.BlobKey = blobKey + + return &blobMinibatchMapping, nil +} + func (m *MinibatchStore) PutBatch(ctx context.Context, batch *batcher.BatchRecord) error { item, err := MarshalBatchRecord(batch) if err != nil { @@ -304,6 +375,27 @@ func (m *MinibatchStore) PutDispersalResponse(ctx context.Context, response *bat return m.dynamoDBClient.PutItem(ctx, m.tableName, item) } +func (m *MinibatchStore) PutBlobMinibatchMappings(ctx context.Context, blobMinibatchMappings []*batcher.BlobMinibatchMapping) error { + items := make([]map[string]types.AttributeValue, len(blobMinibatchMappings)) + var err error + for i, blobMinibatchMapping := range blobMinibatchMappings { + items[i], err = MarshalBlobMinibatchMapping(blobMinibatchMapping) + if err != nil { + return err + } + } + + failedItems, err := m.dynamoDBClient.PutItems(ctx, m.tableName, items) + if err != nil { + return err + } + if len(failedItems) > 0 { + return fmt.Errorf("failed to put blob minibatch mappings: %v", failedItems) + } + + return nil +} + func (m *MinibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batcher.BatchRecord, error) { item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ "BatchID": &types.AttributeValueMemberS{ @@ -618,3 +710,28 @@ func (m *MinibatchStore) GetMinibatchDispersalResponses(ctx context.Context, bat return responses, nil } + +func (m *MinibatchStore) GetBlobMinibatchMappings(ctx context.Context, blobKey disperser.BlobKey) ([]*batcher.BlobMinibatchMapping, error) { + items, err := m.dynamoDBClient.QueryIndex(ctx, m.tableName, blobMinibatchMappingIndexName, "BlobHash = :blobHash AND begins_with(SK, :prefix)", commondynamodb.ExpresseionValues{ + ":blobHash": &types.AttributeValueMemberS{ + Value: blobKey.BlobHash, + }, + ":prefix": &types.AttributeValueMemberS{ + Value: blobMinibatchMappingSKPrefix + blobKey.MetadataHash, + }, + }) + if err != nil { + return nil, err + } + + blobMinibatchMappings := make([]*batcher.BlobMinibatchMapping, len(items)) + for i, item := range items { + blobMinibatchMappings[i], err = UnmarshalBlobMinibatchMapping(item) + if err != nil { + m.logger.Errorf("failed to unmarshal blob minibatch mapping at index %d: %v", i, err) + return nil, err + } + } + + return blobMinibatchMappings, nil +} diff --git a/disperser/batcher/batchstore/minibatch_store_test.go b/disperser/batcher/batchstore/minibatch_store_test.go index 5e3d4c05e7..c17ce73d72 100644 --- a/disperser/batcher/batchstore/minibatch_store_test.go +++ b/disperser/batcher/batchstore/minibatch_store_test.go @@ -11,10 +11,14 @@ import ( "github.com/Layr-Labs/eigenda/common/aws/dynamodb" test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils" "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/disperser/batcher" "github.com/Layr-Labs/eigenda/disperser/batcher/batchstore" + "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/inabox/deploy" "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/consensys/gnark-crypto/ecc/bn254" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" gcommon "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" "github.com/ory/dockertest/v3" @@ -371,3 +375,97 @@ func TestDispersalStatus(t *testing.T) { assert.NoError(t, err) assert.True(t, dispersed) } + +func TestGetBlobMinibatchMappings(t *testing.T) { + ctx := context.Background() + batchID, err := uuid.NewV7() + assert.NoError(t, err) + blobKey := disperser.BlobKey{ + BlobHash: "blob-hash", + MetadataHash: "metadata-hash", + } + var commitX, commitY, lengthX, lengthY fp.Element + _, err = commitX.SetString("21661178944771197726808973281966770251114553549453983978976194544185382599016") + assert.NoError(t, err) + _, err = commitY.SetString("9207254729396071334325696286939045899948985698134704137261649190717970615186") + assert.NoError(t, err) + commitment := &encoding.G1Commitment{ + X: commitX, + Y: commitY, + } + _, err = lengthX.SetString("18730744272503541936633286178165146673834730535090946570310418711896464442549") + assert.NoError(t, err) + _, err = lengthY.SetString("15356431458378126778840641829778151778222945686256112821552210070627093656047") + assert.NoError(t, err) + var lengthXA0, lengthXA1, lengthYA0, lengthYA1 fp.Element + _, err = lengthXA0.SetString("10857046999023057135944570762232829481370756359578518086990519993285655852781") + assert.NoError(t, err) + _, err = lengthXA1.SetString("11559732032986387107991004021392285783925812861821192530917403151452391805634") + assert.NoError(t, err) + _, err = lengthYA0.SetString("8495653923123431417604973247489272438418190587263600148770280649306958101930") + assert.NoError(t, err) + _, err = lengthYA1.SetString("4082367875863433681332203403145435568316851327593401208105741076214120093531") + assert.NoError(t, err) + + var lengthProof, lengthCommitment bn254.G2Affine + lengthProof.X.A0 = lengthXA0 + lengthProof.X.A1 = lengthXA1 + lengthProof.Y.A0 = lengthYA0 + lengthProof.Y.A1 = lengthYA1 + + lengthCommitment = lengthProof + expectedDataLength := 111 + expectedChunkLength := uint(222) + err = minibatchStore.PutBlobMinibatchMappings(ctx, []*batcher.BlobMinibatchMapping{ + { + BlobKey: &blobKey, + BatchID: batchID, + MinibatchIndex: 11, + BlobIndex: 22, + BlobCommitments: encoding.BlobCommitments{ + Commitment: commitment, + LengthCommitment: (*encoding.G2Commitment)(&lengthCommitment), + Length: uint(expectedDataLength), + LengthProof: (*encoding.LengthProof)(&lengthProof), + }, + BlobQuorumInfos: []*core.BlobQuorumInfo{ + { + ChunkLength: expectedChunkLength, + SecurityParam: core.SecurityParam{ + QuorumID: 1, + ConfirmationThreshold: 55, + AdversaryThreshold: 33, + QuorumRate: 123, + }, + }, + }, + }, + }) + assert.NoError(t, err) + + mapping, err := minibatchStore.GetBlobMinibatchMappings(ctx, blobKey) + assert.NoError(t, err) + assert.Equal(t, 1, len(mapping)) + assert.Equal(t, &blobKey, mapping[0].BlobKey) + assert.Equal(t, batchID, mapping[0].BatchID) + assert.Equal(t, uint(11), mapping[0].MinibatchIndex) + assert.Equal(t, uint(22), mapping[0].BlobIndex) + assert.Equal(t, commitment, mapping[0].BlobCommitments.Commitment) + lengthCommitmentBytes, err := mapping[0].BlobCommitments.LengthCommitment.Serialize() + assert.NoError(t, err) + expectedLengthCommitmentBytes := lengthCommitment.Bytes() + assert.Equal(t, expectedLengthCommitmentBytes[:], lengthCommitmentBytes[:]) + assert.Equal(t, expectedDataLength, int(mapping[0].BlobCommitments.Length)) + lengthProofBytes, err := mapping[0].BlobCommitments.LengthProof.Serialize() + assert.NoError(t, err) + expectedLengthProofBytes := lengthProof.Bytes() + assert.Equal(t, expectedLengthProofBytes[:], lengthProofBytes[:]) + assert.Len(t, mapping[0].BlobQuorumInfos, 1) + assert.Equal(t, expectedChunkLength, mapping[0].BlobQuorumInfos[0].ChunkLength) + assert.Equal(t, core.SecurityParam{ + QuorumID: 1, + ConfirmationThreshold: 55, + AdversaryThreshold: 33, + QuorumRate: 123, + }, mapping[0].BlobQuorumInfos[0].SecurityParam) +} diff --git a/disperser/batcher/inmem/minibatch_store.go b/disperser/batcher/inmem/minibatch_store.go index 132d090a67..b0a261b7f7 100644 --- a/disperser/batcher/inmem/minibatch_store.go +++ b/disperser/batcher/inmem/minibatch_store.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/disperser/batcher" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/google/uuid" @@ -23,6 +24,8 @@ type minibatchStore struct { DispersalRequests map[uuid.UUID]map[uint][]*batcher.DispersalRequest // DispersalResponses maps batch IDs to a map from minibatch indices to dispersal responses DispersalResponses map[uuid.UUID]map[uint][]*batcher.DispersalResponse + // BlobMinibatchMapping maps blob key to a map from batch ID to minibatch records + BlobMinibatchMapping map[string]map[uuid.UUID]*batcher.BlobMinibatchMapping mu sync.RWMutex logger logging.Logger @@ -32,10 +35,11 @@ var _ batcher.MinibatchStore = (*minibatchStore)(nil) func NewMinibatchStore(logger logging.Logger) batcher.MinibatchStore { return &minibatchStore{ - BatchRecords: make(map[uuid.UUID]*batcher.BatchRecord), - MinibatchRecords: make(map[uuid.UUID]map[uint]*batcher.MinibatchRecord), - DispersalRequests: make(map[uuid.UUID]map[uint][]*batcher.DispersalRequest), - DispersalResponses: make(map[uuid.UUID]map[uint][]*batcher.DispersalResponse), + BatchRecords: make(map[uuid.UUID]*batcher.BatchRecord), + MinibatchRecords: make(map[uuid.UUID]map[uint]*batcher.MinibatchRecord), + DispersalRequests: make(map[uuid.UUID]map[uint][]*batcher.DispersalRequest), + DispersalResponses: make(map[uuid.UUID]map[uint][]*batcher.DispersalResponse), + BlobMinibatchMapping: make(map[string]map[uuid.UUID]*batcher.BlobMinibatchMapping), logger: logger, } @@ -234,6 +238,41 @@ func (m *minibatchStore) GetMinibatchDispersalResponses(ctx context.Context, bat return m.DispersalResponses[batchID][minibatchIndex], nil } +func (m *minibatchStore) GetBlobMinibatchMappings(ctx context.Context, blobKey disperser.BlobKey) ([]*batcher.BlobMinibatchMapping, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + if _, ok := m.BlobMinibatchMapping[blobKey.String()]; !ok { + return nil, nil + } + + res := make([]*batcher.BlobMinibatchMapping, 0) + for _, blobMinibatchMapping := range m.BlobMinibatchMapping[blobKey.String()] { + res = append(res, blobMinibatchMapping) + } + + return res, nil +} + +func (m *minibatchStore) PutBlobMinibatchMappings(ctx context.Context, blobMinibatchMappings []*batcher.BlobMinibatchMapping) error { + m.mu.Lock() + defer m.mu.Unlock() + + for _, blobMinibatchMapping := range blobMinibatchMappings { + if blobMinibatchMapping.BlobKey == nil { + return errors.New("blob key is nil") + } + blobKey := blobMinibatchMapping.BlobKey.String() + + if _, ok := m.BlobMinibatchMapping[blobKey]; !ok { + m.BlobMinibatchMapping[blobKey] = make(map[uuid.UUID]*batcher.BlobMinibatchMapping) + } + + m.BlobMinibatchMapping[blobKey][blobMinibatchMapping.BatchID] = blobMinibatchMapping + } + return nil +} + func (m *minibatchStore) GetLatestFormedBatch(ctx context.Context) (batch *batcher.BatchRecord, minibatches []*batcher.MinibatchRecord, err error) { m.mu.RLock() defer m.mu.RUnlock() diff --git a/disperser/batcher/inmem/minibatch_store_test.go b/disperser/batcher/inmem/minibatch_store_test.go index 29b30294e4..b47af0a7a7 100644 --- a/disperser/batcher/inmem/minibatch_store_test.go +++ b/disperser/batcher/inmem/minibatch_store_test.go @@ -6,8 +6,12 @@ import ( "time" "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/disperser/batcher" "github.com/Layr-Labs/eigenda/disperser/batcher/inmem" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/consensys/gnark-crypto/ecc/bn254" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" gcommon "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -157,3 +161,98 @@ func TestPutDispersalResponse(t *testing.T) { assert.NoError(t, err) assert.Equal(t, resp2, resp) } + +func TestPutBlobMinibatchMappings(t *testing.T) { + s := newMinibatchStore() + ctx := context.Background() + batchID, err := uuid.NewV7() + assert.NoError(t, err) + blobKey := disperser.BlobKey{ + BlobHash: "blob-hash", + MetadataHash: "metadata-hash", + } + var commitX, commitY, lengthX, lengthY fp.Element + _, err = commitX.SetString("21661178944771197726808973281966770251114553549453983978976194544185382599016") + assert.NoError(t, err) + _, err = commitY.SetString("9207254729396071334325696286939045899948985698134704137261649190717970615186") + assert.NoError(t, err) + commitment := &encoding.G1Commitment{ + X: commitX, + Y: commitY, + } + _, err = lengthX.SetString("18730744272503541936633286178165146673834730535090946570310418711896464442549") + assert.NoError(t, err) + _, err = lengthY.SetString("15356431458378126778840641829778151778222945686256112821552210070627093656047") + assert.NoError(t, err) + var lengthXA0, lengthXA1, lengthYA0, lengthYA1 fp.Element + _, err = lengthXA0.SetString("10857046999023057135944570762232829481370756359578518086990519993285655852781") + assert.NoError(t, err) + _, err = lengthXA1.SetString("11559732032986387107991004021392285783925812861821192530917403151452391805634") + assert.NoError(t, err) + _, err = lengthYA0.SetString("8495653923123431417604973247489272438418190587263600148770280649306958101930") + assert.NoError(t, err) + _, err = lengthYA1.SetString("4082367875863433681332203403145435568316851327593401208105741076214120093531") + assert.NoError(t, err) + + var lengthProof, lengthCommitment bn254.G2Affine + lengthProof.X.A0 = lengthXA0 + lengthProof.X.A1 = lengthXA1 + lengthProof.Y.A0 = lengthYA0 + lengthProof.Y.A1 = lengthYA1 + + lengthCommitment = lengthProof + expectedDataLength := 111 + expectedChunkLength := uint(222) + err = s.PutBlobMinibatchMappings(ctx, []*batcher.BlobMinibatchMapping{ + { + BlobKey: &blobKey, + BatchID: batchID, + MinibatchIndex: 11, + BlobIndex: 22, + BlobCommitments: encoding.BlobCommitments{ + Commitment: commitment, + LengthCommitment: (*encoding.G2Commitment)(&lengthCommitment), + Length: uint(expectedDataLength), + LengthProof: (*encoding.LengthProof)(&lengthProof), + }, + BlobQuorumInfos: []*core.BlobQuorumInfo{ + { + ChunkLength: expectedChunkLength, + SecurityParam: core.SecurityParam{ + QuorumID: 1, + ConfirmationThreshold: 55, + AdversaryThreshold: 33, + QuorumRate: 123, + }, + }, + }, + }, + }) + assert.NoError(t, err) + + mapping, err := s.GetBlobMinibatchMappings(ctx, blobKey) + assert.NoError(t, err) + assert.Equal(t, 1, len(mapping)) + assert.Equal(t, &blobKey, mapping[0].BlobKey) + assert.Equal(t, batchID, mapping[0].BatchID) + assert.Equal(t, uint(11), mapping[0].MinibatchIndex) + assert.Equal(t, uint(22), mapping[0].BlobIndex) + assert.Equal(t, commitment, mapping[0].BlobCommitments.Commitment) + lengthCommitmentBytes, err := mapping[0].BlobCommitments.LengthCommitment.Serialize() + assert.NoError(t, err) + expectedLengthCommitmentBytes := lengthCommitment.Bytes() + assert.Equal(t, expectedLengthCommitmentBytes[:], lengthCommitmentBytes[:]) + assert.Equal(t, expectedDataLength, int(mapping[0].BlobCommitments.Length)) + lengthProofBytes, err := mapping[0].BlobCommitments.LengthProof.Serialize() + assert.NoError(t, err) + expectedLengthProofBytes := lengthProof.Bytes() + assert.Equal(t, expectedLengthProofBytes[:], lengthProofBytes[:]) + assert.Len(t, mapping[0].BlobQuorumInfos, 1) + assert.Equal(t, expectedChunkLength, mapping[0].BlobQuorumInfos[0].ChunkLength) + assert.Equal(t, core.SecurityParam{ + QuorumID: 1, + ConfirmationThreshold: 55, + AdversaryThreshold: 33, + QuorumRate: 123, + }, mapping[0].BlobQuorumInfos[0].SecurityParam) +} diff --git a/disperser/batcher/minibatch_store.go b/disperser/batcher/minibatch_store.go index 5f27c004a2..f909d9c6ea 100644 --- a/disperser/batcher/minibatch_store.go +++ b/disperser/batcher/minibatch_store.go @@ -5,6 +5,8 @@ import ( "time" "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/encoding" gcommon "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" ) @@ -66,6 +68,15 @@ type DispersalResponse struct { Error error } +type BlobMinibatchMapping struct { + BlobKey *disperser.BlobKey `dynamodbav:"-"` + BatchID uuid.UUID `dynamodbav:"-"` + MinibatchIndex uint + BlobIndex uint + encoding.BlobCommitments + BlobQuorumInfos []*core.BlobQuorumInfo +} + type MinibatchStore interface { PutBatch(ctx context.Context, batch *BatchRecord) error GetBatch(ctx context.Context, batchID uuid.UUID) (*BatchRecord, error) @@ -80,7 +91,8 @@ type MinibatchStore interface { PutDispersalResponse(ctx context.Context, response *DispersalResponse) error GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*DispersalResponse, error) GetMinibatchDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*DispersalResponse, error) - + GetBlobMinibatchMappings(ctx context.Context, blobKey disperser.BlobKey) ([]*BlobMinibatchMapping, error) + PutBlobMinibatchMappings(ctx context.Context, blobMinibatchMappings []*BlobMinibatchMapping) error // GetLatestFormedBatch returns the latest batch that has been formed. // If there is no formed batch, it returns nil. // It also returns the minibatches that belong to the batch in the ascending order of minibatch index.