From fdae9895a111deaa27e86ba5b82abfe9f5ba190e Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Thu, 11 Jul 2024 18:23:09 -0700 Subject: [PATCH] encoding streamer create minibatch --- disperser/batcher/encoded_blob_store.go | 26 +++ disperser/batcher/encoding_streamer.go | 148 ++++++++++++++++- disperser/batcher/encoding_streamer_test.go | 168 ++++++++++++++++++++ 3 files changed, 340 insertions(+), 2 deletions(-) diff --git a/disperser/batcher/encoded_blob_store.go b/disperser/batcher/encoded_blob_store.go index f2e2f8b8a3..5ce98d6d71 100644 --- a/disperser/batcher/encoded_blob_store.go +++ b/disperser/batcher/encoded_blob_store.go @@ -133,6 +133,32 @@ func (e *encodedBlobStore) DeleteEncodingResult(blobKey disperser.BlobKey, quoru e.encodedResultSize -= getChunksSize(encodedResult) } +// PopLatestEncodingResults returns all the encoded results that are pending dispersal and deletes them along with stale results that are older than the given reference block +func (e *encodedBlobStore) PopLatestEncodingResults(refBlockNumber uint) []*EncodingResult { + e.mu.Lock() + defer e.mu.Unlock() + + fetched := make([]*EncodingResult, 0) + staleCount := 0 + for k, encodedResult := range e.encoded { + if encodedResult.ReferenceBlockNumber == refBlockNumber { + fetched = append(fetched, encodedResult) + // this is safe: https://go.dev/doc/effective_go#for + delete(e.encoded, k) + e.encodedResultSize -= getChunksSize(encodedResult) + } else if encodedResult.ReferenceBlockNumber < refBlockNumber { + delete(e.encoded, k) + staleCount++ + e.encodedResultSize -= getChunksSize(encodedResult) + } else { + e.logger.Error("unexpected case", "refBlockNumber", encodedResult.ReferenceBlockNumber, "refBlockNumber", refBlockNumber) + } + } + e.logger.Debug("consumed encoded results", "fetched", len(fetched), "stale", staleCount, "refBlockNumber", refBlockNumber, "encodedSize", e.encodedResultSize) + + return fetched +} + // GetNewAndDeleteStaleEncodingResults returns all the fresh encoded results that are pending dispersal, and deletes all the stale results that are older than the given block number func (e *encodedBlobStore) GetNewAndDeleteStaleEncodingResults(blockNumber uint) []*EncodingResult { e.mu.Lock() diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 800e90ed2c..78c62ff0da 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -387,9 +387,7 @@ func (e *EncodingStreamer) RequestEncodingForBlob(ctx context.Context, metadata } }) e.EncodedBlobstore.PutEncodingRequest(blobKey, res.BlobQuorumInfo.QuorumID) - } - } func (e *EncodingStreamer) ProcessEncodedBlobs(ctx context.Context, result EncodingResultOrStatus) error { @@ -420,6 +418,152 @@ func (e *EncodingStreamer) ProcessEncodedBlobs(ctx context.Context, result Encod return nil } +func (e *EncodingStreamer) UpdateReferenecBlock(currentBlockNumber uint) error { + blockNumber := currentBlockNumber + if blockNumber > e.FinalizationBlockDelay { + blockNumber -= e.FinalizationBlockDelay + } + if e.ReferenceBlockNumber > blockNumber { + return fmt.Errorf("reference block number is being updated to a lower value: from %d to %d", e.ReferenceBlockNumber, blockNumber) + } + e.mu.Lock() + defer e.mu.Unlock() + if e.ReferenceBlockNumber < blockNumber { + // Wipe out the encoding results based on previous reference block number + _ = e.EncodedBlobstore.PopLatestEncodingResults(e.ReferenceBlockNumber) + } + e.ReferenceBlockNumber = blockNumber + return nil +} + +func (e *EncodingStreamer) CreateMinibatch(ctx context.Context) (*batch, error) { + e.mu.Lock() + defer e.mu.Unlock() + // Cancel outstanding encoding requests + // Assumption: `CreateMinibatch` will be called at an interval longer than time it takes to encode a single blob + if len(e.encodingCtxCancelFuncs) > 0 { + e.logger.Info("canceling outstanding encoding requests", "count", len(e.encodingCtxCancelFuncs)) + for _, cancel := range e.encodingCtxCancelFuncs { + cancel() + } + e.encodingCtxCancelFuncs = make([]context.CancelFunc, 0) + } + + // Pop the latest encoded blobs and delete any stale results that are not from the current batching iteration (i.e. that has different reference block number) + // If any pending encoded results are discarded here, it will be re-requested in the next iteration + encodedResults := e.EncodedBlobstore.PopLatestEncodingResults(e.ReferenceBlockNumber) + + e.logger.Info("creating a batch...", "numBlobs", len(encodedResults), "refblockNumber", e.ReferenceBlockNumber) + if len(encodedResults) == 0 { + return nil, errNoEncodedResults + } + + encodedBlobByKey := make(map[disperser.BlobKey]core.EncodedBlob) + blobQuorums := make(map[disperser.BlobKey][]*core.BlobQuorumInfo) + blobHeaderByKey := make(map[disperser.BlobKey]*core.BlobHeader) + metadataByKey := make(map[disperser.BlobKey]*disperser.BlobMetadata) + for i := range encodedResults { + // each result represent an encoded result per (blob, quorum param) + // if the same blob has been dispersed multiple time with different security params, + // there will be multiple encoded results for that (blob, quorum) + result := encodedResults[i] + blobKey := result.BlobMetadata.GetBlobKey() + if _, ok := encodedBlobByKey[blobKey]; !ok { + metadataByKey[blobKey] = result.BlobMetadata + blobQuorums[blobKey] = make([]*core.BlobQuorumInfo, 0) + blobHeader := &core.BlobHeader{ + BlobCommitments: *result.Commitment, + } + blobHeaderByKey[blobKey] = blobHeader + encodedBlobByKey[blobKey] = core.EncodedBlob{ + BlobHeader: blobHeader, + BundlesByOperator: make(map[core.OperatorID]core.Bundles), + } + } + + // Populate the assigned bundles + for opID, assignment := range result.Assignments { + bundles, ok := encodedBlobByKey[blobKey].BundlesByOperator[opID] + if !ok { + encodedBlobByKey[blobKey].BundlesByOperator[opID] = make(core.Bundles) + bundles = encodedBlobByKey[blobKey].BundlesByOperator[opID] + } + bundles[result.BlobQuorumInfo.QuorumID] = append(bundles[result.BlobQuorumInfo.QuorumID], result.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) + } + + blobQuorums[blobKey] = append(blobQuorums[blobKey], result.BlobQuorumInfo) + } + + // Populate the blob quorum infos + for blobKey, encodedBlob := range encodedBlobByKey { + encodedBlob.BlobHeader.QuorumInfos = blobQuorums[blobKey] + } + + for blobKey, metadata := range metadataByKey { + quorumPresent := make(map[core.QuorumID]bool) + for _, quorum := range blobQuorums[blobKey] { + quorumPresent[quorum.QuorumID] = true + } + // Check if the blob has valid quorums. If any of the quorums are not valid, delete the blobKey + for _, quorum := range metadata.RequestMetadata.SecurityParams { + _, ok := quorumPresent[quorum.QuorumID] + if !ok { + // Delete the blobKey. These encoded blobs will be automatically removed by the next run of + // RequestEncoding + delete(metadataByKey, blobKey) + break + } + } + } + + if len(metadataByKey) == 0 { + return nil, errNoEncodedResults + } + + // Transform maps to slices so orders in different slices match + encodedBlobs := make([]core.EncodedBlob, len(metadataByKey)) + blobHeaders := make([]*core.BlobHeader, len(metadataByKey)) + metadatas := make([]*disperser.BlobMetadata, len(metadataByKey)) + i := 0 + for key := range metadataByKey { + err := e.transitionBlobToDispersing(ctx, metadataByKey[key]) + if err != nil { + continue + } + encodedBlobs[i] = encodedBlobByKey[key] + blobHeaders[i] = blobHeaderByKey[key] + metadatas[i] = metadataByKey[key] + i++ + } + + timeoutCtx, cancel := context.WithTimeout(context.Background(), e.ChainStateTimeout) + defer cancel() + + state, err := e.getOperatorState(timeoutCtx, metadatas, e.ReferenceBlockNumber) + if err != nil { + return nil, err + } + + // Populate the batch header + batchHeader := &core.BatchHeader{ + ReferenceBlockNumber: e.ReferenceBlockNumber, + BatchRoot: [32]byte{}, + } + + _, err = batchHeader.SetBatchRoot(blobHeaders) + if err != nil { + return nil, err + } + + return &batch{ + EncodedBlobs: encodedBlobs, + BatchHeader: batchHeader, + BlobHeaders: blobHeaders, + BlobMetadata: metadatas, + State: state, + }, nil +} + // CreateBatch makes a batch from all blobs in the encoded blob store. // If successful, it returns a batch, and updates the reference block number for next batch to use. // Otherwise, it returns an error and keeps the blobs in the encoded blob store. diff --git a/disperser/batcher/encoding_streamer_test.go b/disperser/batcher/encoding_streamer_test.go index c67a90d11e..8769ebb1b7 100644 --- a/disperser/batcher/encoding_streamer_test.go +++ b/disperser/batcher/encoding_streamer_test.go @@ -749,3 +749,171 @@ func TestGetBatch(t *testing.T) { assert.Contains(t, batch.BlobMetadata, metadata1) assert.Contains(t, batch.BlobMetadata, metadata2) } + +func TestCreateMinibatch(t *testing.T) { + encodingStreamer, c := createEncodingStreamer(t, 10, 1e12, streamerConfig) + ctx := context.Background() + + // put 2 blobs in the blobstore + blob1 := makeTestBlob([]*core.SecurityParam{{ + QuorumID: 0, + AdversaryThreshold: 80, + ConfirmationThreshold: 100, + }, { + QuorumID: 1, + AdversaryThreshold: 70, + ConfirmationThreshold: 95, + }}) + blob2 := makeTestBlob([]*core.SecurityParam{{ + QuorumID: 2, + AdversaryThreshold: 75, + ConfirmationThreshold: 100, + }}) + metadataKey1, err := c.blobStore.StoreBlob(ctx, &blob1, uint64(time.Now().UnixNano())) + assert.Nil(t, err) + metadata1, err := c.blobStore.GetBlobMetadata(ctx, metadataKey1) + assert.Nil(t, err) + assert.Equal(t, disperser.Processing, metadata1.BlobStatus) + metadataKey2, err := c.blobStore.StoreBlob(ctx, &blob2, uint64(time.Now().UnixNano())) + assert.Nil(t, err) + metadata2, err := c.blobStore.GetBlobMetadata(ctx, metadataKey2) + assert.Nil(t, err) + assert.Equal(t, disperser.Processing, metadata2.BlobStatus) + + // request encoding + out := make(chan batcher.EncodingResultOrStatus) + err = encodingStreamer.RequestEncoding(context.Background(), out) + assert.Nil(t, err) + isRequested := encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey1, core.QuorumID(0), 10) + assert.True(t, isRequested) + isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey1, core.QuorumID(1), 10) + assert.True(t, isRequested) + isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey2, core.QuorumID(2), 10) + assert.True(t, isRequested) + + err = encodingStreamer.ProcessEncodedBlobs(context.Background(), <-out) + assert.Nil(t, err) + err = encodingStreamer.ProcessEncodedBlobs(context.Background(), <-out) + assert.Nil(t, err) + err = encodingStreamer.ProcessEncodedBlobs(context.Background(), <-out) + assert.Nil(t, err) + encodingStreamer.Pool.StopWait() + + isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey1, core.QuorumID(0), 10) + assert.True(t, isRequested) + isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey1, core.QuorumID(1), 10) + assert.True(t, isRequested) + isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey2, core.QuorumID(2), 10) + assert.True(t, isRequested) + + // get batch + assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(10)) + batch, err := encodingStreamer.CreateMinibatch(context.Background()) + assert.Nil(t, err) + assert.NotNil(t, batch) + assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(10)) + metadata1, err = c.blobStore.GetBlobMetadata(ctx, metadataKey1) + assert.Nil(t, err) + assert.Equal(t, disperser.Dispersing, metadata1.BlobStatus) + metadata2, err = c.blobStore.GetBlobMetadata(ctx, metadataKey2) + assert.Equal(t, disperser.Dispersing, metadata2.BlobStatus) + assert.Nil(t, err) + res, err := encodingStreamer.EncodedBlobstore.GetEncodingResult(metadataKey1, core.QuorumID(0)) + assert.Nil(t, res) + assert.ErrorContains(t, err, "GetEncodedBlob: no such key") + res, err = encodingStreamer.EncodedBlobstore.GetEncodingResult(metadataKey1, core.QuorumID(1)) + assert.Nil(t, res) + assert.ErrorContains(t, err, "GetEncodedBlob: no such key") + res, err = encodingStreamer.EncodedBlobstore.GetEncodingResult(metadataKey2, core.QuorumID(0)) + assert.Nil(t, res) + assert.ErrorContains(t, err, "GetEncodedBlob: no such key") + + err = encodingStreamer.UpdateReferenecBlock(15) + assert.Nil(t, err) + assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(15)) + + // Check BatchHeader + assert.NotNil(t, batch.BatchHeader) + assert.Greater(t, len(batch.BatchHeader.BatchRoot), 0) + assert.Equal(t, batch.BatchHeader.ReferenceBlockNumber, uint(10)) + + // Check State + assert.NotNil(t, batch.State) + + // Check EncodedBlobs + assert.Len(t, batch.EncodedBlobs, 2) + assert.Len(t, batch.EncodedBlobs[0].BundlesByOperator, numOperators) + + var encodedBlob1 core.EncodedBlob + var encodedBlob2 core.EncodedBlob + for i := range batch.BlobHeaders { + blobHeader := batch.BlobHeaders[i] + if len(blobHeader.QuorumInfos) > 1 { + encodedBlob1 = batch.EncodedBlobs[i] + // batch.EncodedBlobs and batch.BlobMetadata should be in the same order + assert.ElementsMatch(t, batch.BlobMetadata[i].RequestMetadata.SecurityParams, blob1.RequestHeader.SecurityParams) + } else { + encodedBlob2 = batch.EncodedBlobs[i] + assert.ElementsMatch(t, batch.BlobMetadata[i].RequestMetadata.SecurityParams, blob2.RequestHeader.SecurityParams) + } + } + assert.NotNil(t, encodedBlob1) + assert.NotNil(t, encodedBlob2) + + assert.NotNil(t, encodedBlob1.BlobHeader) + assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments) + assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments.Commitment) + assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments.LengthProof) + assert.Equal(t, encodedBlob1.BlobHeader.BlobCommitments.Length, uint(48)) + assert.Len(t, encodedBlob1.BlobHeader.QuorumInfos, 2) + assert.ElementsMatch(t, encodedBlob1.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{ + { + SecurityParam: core.SecurityParam{ + QuorumID: 0, + AdversaryThreshold: 80, + ConfirmationThreshold: 100, + }, + ChunkLength: 16, + }, + { + SecurityParam: core.SecurityParam{ + QuorumID: 1, + AdversaryThreshold: 70, + ConfirmationThreshold: 95, + }, + ChunkLength: 8, + }, + }) + + assert.Contains(t, batch.BlobHeaders, encodedBlob1.BlobHeader) + for _, bundles := range encodedBlob1.BundlesByOperator { + assert.Len(t, bundles, 2) + assert.Greater(t, len(bundles[0]), 0) + assert.Greater(t, len(bundles[1]), 0) + break + } + + assert.NotNil(t, encodedBlob2.BlobHeader) + assert.NotNil(t, encodedBlob2.BlobHeader.BlobCommitments) + assert.NotNil(t, encodedBlob2.BlobHeader.BlobCommitments.Commitment) + assert.NotNil(t, encodedBlob2.BlobHeader.BlobCommitments.LengthProof) + assert.Equal(t, encodedBlob2.BlobHeader.BlobCommitments.Length, uint(48)) + assert.Len(t, encodedBlob2.BlobHeader.QuorumInfos, 1) + assert.ElementsMatch(t, encodedBlob2.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{{ + SecurityParam: core.SecurityParam{ + QuorumID: 2, + AdversaryThreshold: 75, + ConfirmationThreshold: 100, + }, + ChunkLength: 8, + }}) + for _, bundles := range encodedBlob2.BundlesByOperator { + assert.Len(t, bundles, 1) + assert.Greater(t, len(bundles[core.QuorumID(2)]), 0) + break + } + assert.Len(t, batch.BlobHeaders, 2) + assert.Len(t, batch.BlobMetadata, 2) + assert.Contains(t, batch.BlobMetadata, metadata1) + assert.Contains(t, batch.BlobMetadata, metadata2) +}