Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[minibatch] Create minibatch from encoding streamer #637

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions disperser/batcher/encoded_blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,32 @@ func (e *encodedBlobStore) DeleteEncodingResult(blobKey disperser.BlobKey, quoru
e.encodedResultSize -= getChunksSize(encodedResult)
}

// PopLatestEncodingResults returns all the encoded results that are pending dispersal and deletes them along with stale results that are older than the given reference block
func (e *encodedBlobStore) PopLatestEncodingResults(refBlockNumber uint) []*EncodingResult {
e.mu.Lock()
defer e.mu.Unlock()

fetched := make([]*EncodingResult, 0)
staleCount := 0
for k, encodedResult := range e.encoded {
if encodedResult.ReferenceBlockNumber == refBlockNumber {
fetched = append(fetched, encodedResult)
// this is safe: https://go.dev/doc/effective_go#for
delete(e.encoded, k)
e.encodedResultSize -= getChunksSize(encodedResult)
} else if encodedResult.ReferenceBlockNumber < refBlockNumber {
delete(e.encoded, k)
staleCount++
e.encodedResultSize -= getChunksSize(encodedResult)
} else {
e.logger.Error("unexpected case", "refBlockNumber", encodedResult.ReferenceBlockNumber, "refBlockNumber", refBlockNumber)
}
}
e.logger.Debug("consumed encoded results", "fetched", len(fetched), "stale", staleCount, "refBlockNumber", refBlockNumber, "encodedSize", e.encodedResultSize)

return fetched
}

// GetNewAndDeleteStaleEncodingResults returns all the fresh encoded results that are pending dispersal, and deletes all the stale results that are older than the given block number
func (e *encodedBlobStore) GetNewAndDeleteStaleEncodingResults(blockNumber uint) []*EncodingResult {
e.mu.Lock()
Expand Down
148 changes: 146 additions & 2 deletions disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,7 @@ func (e *EncodingStreamer) RequestEncodingForBlob(ctx context.Context, metadata
}
})
e.EncodedBlobstore.PutEncodingRequest(blobKey, res.BlobQuorumInfo.QuorumID)

}

}

func (e *EncodingStreamer) ProcessEncodedBlobs(ctx context.Context, result EncodingResultOrStatus) error {
Expand Down Expand Up @@ -420,6 +418,152 @@ func (e *EncodingStreamer) ProcessEncodedBlobs(ctx context.Context, result Encod
return nil
}

func (e *EncodingStreamer) UpdateReferenecBlock(currentBlockNumber uint) error {
blockNumber := currentBlockNumber
if blockNumber > e.FinalizationBlockDelay {
blockNumber -= e.FinalizationBlockDelay
}
if e.ReferenceBlockNumber > blockNumber {
return fmt.Errorf("reference block number is being updated to a lower value: from %d to %d", e.ReferenceBlockNumber, blockNumber)
}
e.mu.Lock()
defer e.mu.Unlock()
if e.ReferenceBlockNumber < blockNumber {
// Wipe out the encoding results based on previous reference block number
_ = e.EncodedBlobstore.PopLatestEncodingResults(e.ReferenceBlockNumber)
}
e.ReferenceBlockNumber = blockNumber
return nil
}

func (e *EncodingStreamer) CreateMinibatch(ctx context.Context) (*batch, error) {
e.mu.Lock()
defer e.mu.Unlock()
// Cancel outstanding encoding requests
// Assumption: `CreateMinibatch` will be called at an interval longer than time it takes to encode a single blob
if len(e.encodingCtxCancelFuncs) > 0 {
e.logger.Info("canceling outstanding encoding requests", "count", len(e.encodingCtxCancelFuncs))
for _, cancel := range e.encodingCtxCancelFuncs {
cancel()
}
e.encodingCtxCancelFuncs = make([]context.CancelFunc, 0)
}

// Pop the latest encoded blobs and delete any stale results that are not from the current batching iteration (i.e. that has different reference block number)
// If any pending encoded results are discarded here, it will be re-requested in the next iteration
encodedResults := e.EncodedBlobstore.PopLatestEncodingResults(e.ReferenceBlockNumber)

e.logger.Info("creating a batch...", "numBlobs", len(encodedResults), "refblockNumber", e.ReferenceBlockNumber)
if len(encodedResults) == 0 {
return nil, errNoEncodedResults
}

encodedBlobByKey := make(map[disperser.BlobKey]core.EncodedBlob)
blobQuorums := make(map[disperser.BlobKey][]*core.BlobQuorumInfo)
blobHeaderByKey := make(map[disperser.BlobKey]*core.BlobHeader)
metadataByKey := make(map[disperser.BlobKey]*disperser.BlobMetadata)
for i := range encodedResults {
// each result represent an encoded result per (blob, quorum param)
// if the same blob has been dispersed multiple time with different security params,
// there will be multiple encoded results for that (blob, quorum)
result := encodedResults[i]
blobKey := result.BlobMetadata.GetBlobKey()
if _, ok := encodedBlobByKey[blobKey]; !ok {
metadataByKey[blobKey] = result.BlobMetadata
blobQuorums[blobKey] = make([]*core.BlobQuorumInfo, 0)
blobHeader := &core.BlobHeader{
BlobCommitments: *result.Commitment,
}
blobHeaderByKey[blobKey] = blobHeader
encodedBlobByKey[blobKey] = core.EncodedBlob{
BlobHeader: blobHeader,
BundlesByOperator: make(map[core.OperatorID]core.Bundles),
}
}

// Populate the assigned bundles
for opID, assignment := range result.Assignments {
bundles, ok := encodedBlobByKey[blobKey].BundlesByOperator[opID]
if !ok {
encodedBlobByKey[blobKey].BundlesByOperator[opID] = make(core.Bundles)
bundles = encodedBlobByKey[blobKey].BundlesByOperator[opID]
}
bundles[result.BlobQuorumInfo.QuorumID] = append(bundles[result.BlobQuorumInfo.QuorumID], result.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...)
}

blobQuorums[blobKey] = append(blobQuorums[blobKey], result.BlobQuorumInfo)
}

// Populate the blob quorum infos
for blobKey, encodedBlob := range encodedBlobByKey {
encodedBlob.BlobHeader.QuorumInfos = blobQuorums[blobKey]
}

for blobKey, metadata := range metadataByKey {
quorumPresent := make(map[core.QuorumID]bool)
for _, quorum := range blobQuorums[blobKey] {
quorumPresent[quorum.QuorumID] = true
}
// Check if the blob has valid quorums. If any of the quorums are not valid, delete the blobKey
for _, quorum := range metadata.RequestMetadata.SecurityParams {
_, ok := quorumPresent[quorum.QuorumID]
if !ok {
// Delete the blobKey. These encoded blobs will be automatically removed by the next run of
// RequestEncoding
delete(metadataByKey, blobKey)
break
}
}
}

if len(metadataByKey) == 0 {
return nil, errNoEncodedResults
}

// Transform maps to slices so orders in different slices match
encodedBlobs := make([]core.EncodedBlob, len(metadataByKey))
blobHeaders := make([]*core.BlobHeader, len(metadataByKey))
metadatas := make([]*disperser.BlobMetadata, len(metadataByKey))
i := 0
for key := range metadataByKey {
err := e.transitionBlobToDispersing(ctx, metadataByKey[key])
if err != nil {
continue
}
encodedBlobs[i] = encodedBlobByKey[key]
blobHeaders[i] = blobHeaderByKey[key]
metadatas[i] = metadataByKey[key]
i++
}

timeoutCtx, cancel := context.WithTimeout(context.Background(), e.ChainStateTimeout)
defer cancel()

state, err := e.getOperatorState(timeoutCtx, metadatas, e.ReferenceBlockNumber)
if err != nil {
return nil, err
}

// Populate the batch header
batchHeader := &core.BatchHeader{
ReferenceBlockNumber: e.ReferenceBlockNumber,
BatchRoot: [32]byte{},
}

_, err = batchHeader.SetBatchRoot(blobHeaders)
if err != nil {
return nil, err
}

return &batch{
EncodedBlobs: encodedBlobs,
BatchHeader: batchHeader,
BlobHeaders: blobHeaders,
BlobMetadata: metadatas,
State: state,
}, nil
}

// CreateBatch makes a batch from all blobs in the encoded blob store.
// If successful, it returns a batch, and updates the reference block number for next batch to use.
// Otherwise, it returns an error and keeps the blobs in the encoded blob store.
Expand Down
168 changes: 168 additions & 0 deletions disperser/batcher/encoding_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,3 +749,171 @@ func TestGetBatch(t *testing.T) {
assert.Contains(t, batch.BlobMetadata, metadata1)
assert.Contains(t, batch.BlobMetadata, metadata2)
}

func TestCreateMinibatch(t *testing.T) {
encodingStreamer, c := createEncodingStreamer(t, 10, 1e12, streamerConfig)
ctx := context.Background()

// put 2 blobs in the blobstore
blob1 := makeTestBlob([]*core.SecurityParam{{
QuorumID: 0,
AdversaryThreshold: 80,
ConfirmationThreshold: 100,
}, {
QuorumID: 1,
AdversaryThreshold: 70,
ConfirmationThreshold: 95,
}})
blob2 := makeTestBlob([]*core.SecurityParam{{
QuorumID: 2,
AdversaryThreshold: 75,
ConfirmationThreshold: 100,
}})
metadataKey1, err := c.blobStore.StoreBlob(ctx, &blob1, uint64(time.Now().UnixNano()))
assert.Nil(t, err)
metadata1, err := c.blobStore.GetBlobMetadata(ctx, metadataKey1)
assert.Nil(t, err)
assert.Equal(t, disperser.Processing, metadata1.BlobStatus)
metadataKey2, err := c.blobStore.StoreBlob(ctx, &blob2, uint64(time.Now().UnixNano()))
assert.Nil(t, err)
metadata2, err := c.blobStore.GetBlobMetadata(ctx, metadataKey2)
assert.Nil(t, err)
assert.Equal(t, disperser.Processing, metadata2.BlobStatus)

// request encoding
out := make(chan batcher.EncodingResultOrStatus)
err = encodingStreamer.RequestEncoding(context.Background(), out)
assert.Nil(t, err)
isRequested := encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey1, core.QuorumID(0), 10)
assert.True(t, isRequested)
isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey1, core.QuorumID(1), 10)
assert.True(t, isRequested)
isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey2, core.QuorumID(2), 10)
assert.True(t, isRequested)

err = encodingStreamer.ProcessEncodedBlobs(context.Background(), <-out)
assert.Nil(t, err)
err = encodingStreamer.ProcessEncodedBlobs(context.Background(), <-out)
assert.Nil(t, err)
err = encodingStreamer.ProcessEncodedBlobs(context.Background(), <-out)
assert.Nil(t, err)
encodingStreamer.Pool.StopWait()

isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey1, core.QuorumID(0), 10)
assert.True(t, isRequested)
isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey1, core.QuorumID(1), 10)
assert.True(t, isRequested)
isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey2, core.QuorumID(2), 10)
assert.True(t, isRequested)

// get batch
assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(10))
batch, err := encodingStreamer.CreateMinibatch(context.Background())
assert.Nil(t, err)
assert.NotNil(t, batch)
assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(10))
metadata1, err = c.blobStore.GetBlobMetadata(ctx, metadataKey1)
assert.Nil(t, err)
assert.Equal(t, disperser.Dispersing, metadata1.BlobStatus)
metadata2, err = c.blobStore.GetBlobMetadata(ctx, metadataKey2)
assert.Equal(t, disperser.Dispersing, metadata2.BlobStatus)
assert.Nil(t, err)
res, err := encodingStreamer.EncodedBlobstore.GetEncodingResult(metadataKey1, core.QuorumID(0))
assert.Nil(t, res)
assert.ErrorContains(t, err, "GetEncodedBlob: no such key")
res, err = encodingStreamer.EncodedBlobstore.GetEncodingResult(metadataKey1, core.QuorumID(1))
assert.Nil(t, res)
assert.ErrorContains(t, err, "GetEncodedBlob: no such key")
res, err = encodingStreamer.EncodedBlobstore.GetEncodingResult(metadataKey2, core.QuorumID(0))
assert.Nil(t, res)
assert.ErrorContains(t, err, "GetEncodedBlob: no such key")

err = encodingStreamer.UpdateReferenecBlock(15)
assert.Nil(t, err)
assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(15))

// Check BatchHeader
assert.NotNil(t, batch.BatchHeader)
assert.Greater(t, len(batch.BatchHeader.BatchRoot), 0)
assert.Equal(t, batch.BatchHeader.ReferenceBlockNumber, uint(10))

// Check State
assert.NotNil(t, batch.State)

// Check EncodedBlobs
assert.Len(t, batch.EncodedBlobs, 2)
assert.Len(t, batch.EncodedBlobs[0].BundlesByOperator, numOperators)

var encodedBlob1 core.EncodedBlob
var encodedBlob2 core.EncodedBlob
for i := range batch.BlobHeaders {
blobHeader := batch.BlobHeaders[i]
if len(blobHeader.QuorumInfos) > 1 {
encodedBlob1 = batch.EncodedBlobs[i]
// batch.EncodedBlobs and batch.BlobMetadata should be in the same order
assert.ElementsMatch(t, batch.BlobMetadata[i].RequestMetadata.SecurityParams, blob1.RequestHeader.SecurityParams)
} else {
encodedBlob2 = batch.EncodedBlobs[i]
assert.ElementsMatch(t, batch.BlobMetadata[i].RequestMetadata.SecurityParams, blob2.RequestHeader.SecurityParams)
}
}
assert.NotNil(t, encodedBlob1)
assert.NotNil(t, encodedBlob2)

assert.NotNil(t, encodedBlob1.BlobHeader)
assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments)
assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments.Commitment)
assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments.LengthProof)
assert.Equal(t, encodedBlob1.BlobHeader.BlobCommitments.Length, uint(48))
assert.Len(t, encodedBlob1.BlobHeader.QuorumInfos, 2)
assert.ElementsMatch(t, encodedBlob1.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{
{
SecurityParam: core.SecurityParam{
QuorumID: 0,
AdversaryThreshold: 80,
ConfirmationThreshold: 100,
},
ChunkLength: 16,
},
{
SecurityParam: core.SecurityParam{
QuorumID: 1,
AdversaryThreshold: 70,
ConfirmationThreshold: 95,
},
ChunkLength: 8,
},
})

assert.Contains(t, batch.BlobHeaders, encodedBlob1.BlobHeader)
for _, bundles := range encodedBlob1.BundlesByOperator {
assert.Len(t, bundles, 2)
assert.Greater(t, len(bundles[0]), 0)
assert.Greater(t, len(bundles[1]), 0)
break
}

assert.NotNil(t, encodedBlob2.BlobHeader)
assert.NotNil(t, encodedBlob2.BlobHeader.BlobCommitments)
assert.NotNil(t, encodedBlob2.BlobHeader.BlobCommitments.Commitment)
assert.NotNil(t, encodedBlob2.BlobHeader.BlobCommitments.LengthProof)
assert.Equal(t, encodedBlob2.BlobHeader.BlobCommitments.Length, uint(48))
assert.Len(t, encodedBlob2.BlobHeader.QuorumInfos, 1)
assert.ElementsMatch(t, encodedBlob2.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{{
SecurityParam: core.SecurityParam{
QuorumID: 2,
AdversaryThreshold: 75,
ConfirmationThreshold: 100,
},
ChunkLength: 8,
}})
for _, bundles := range encodedBlob2.BundlesByOperator {
assert.Len(t, bundles, 1)
assert.Greater(t, len(bundles[core.QuorumID(2)]), 0)
break
}
assert.Len(t, batch.BlobHeaders, 2)
assert.Len(t, batch.BlobMetadata, 2)
assert.Contains(t, batch.BlobMetadata, metadata1)
assert.Contains(t, batch.BlobMetadata, metadata2)
}
Loading