Skip to content

Commit

Permalink
encoding streamer create minibatch
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Jul 12, 2024
1 parent 660f0f1 commit fdae989
Show file tree
Hide file tree
Showing 3 changed files with 340 additions and 2 deletions.
26 changes: 26 additions & 0 deletions disperser/batcher/encoded_blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,32 @@ func (e *encodedBlobStore) DeleteEncodingResult(blobKey disperser.BlobKey, quoru
e.encodedResultSize -= getChunksSize(encodedResult)
}

// PopLatestEncodingResults returns all the encoded results that are pending dispersal and deletes them along with stale results that are older than the given reference block
func (e *encodedBlobStore) PopLatestEncodingResults(refBlockNumber uint) []*EncodingResult {
e.mu.Lock()
defer e.mu.Unlock()

fetched := make([]*EncodingResult, 0)
staleCount := 0
for k, encodedResult := range e.encoded {
if encodedResult.ReferenceBlockNumber == refBlockNumber {
fetched = append(fetched, encodedResult)
// this is safe: https://go.dev/doc/effective_go#for
delete(e.encoded, k)
e.encodedResultSize -= getChunksSize(encodedResult)
} else if encodedResult.ReferenceBlockNumber < refBlockNumber {
delete(e.encoded, k)
staleCount++
e.encodedResultSize -= getChunksSize(encodedResult)
} else {
e.logger.Error("unexpected case", "refBlockNumber", encodedResult.ReferenceBlockNumber, "refBlockNumber", refBlockNumber)
}
}
e.logger.Debug("consumed encoded results", "fetched", len(fetched), "stale", staleCount, "refBlockNumber", refBlockNumber, "encodedSize", e.encodedResultSize)

return fetched
}

// GetNewAndDeleteStaleEncodingResults returns all the fresh encoded results that are pending dispersal, and deletes all the stale results that are older than the given block number
func (e *encodedBlobStore) GetNewAndDeleteStaleEncodingResults(blockNumber uint) []*EncodingResult {
e.mu.Lock()
Expand Down
148 changes: 146 additions & 2 deletions disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,7 @@ func (e *EncodingStreamer) RequestEncodingForBlob(ctx context.Context, metadata
}
})
e.EncodedBlobstore.PutEncodingRequest(blobKey, res.BlobQuorumInfo.QuorumID)

}

}

func (e *EncodingStreamer) ProcessEncodedBlobs(ctx context.Context, result EncodingResultOrStatus) error {
Expand Down Expand Up @@ -420,6 +418,152 @@ func (e *EncodingStreamer) ProcessEncodedBlobs(ctx context.Context, result Encod
return nil
}

func (e *EncodingStreamer) UpdateReferenecBlock(currentBlockNumber uint) error {
blockNumber := currentBlockNumber
if blockNumber > e.FinalizationBlockDelay {
blockNumber -= e.FinalizationBlockDelay
}
if e.ReferenceBlockNumber > blockNumber {
return fmt.Errorf("reference block number is being updated to a lower value: from %d to %d", e.ReferenceBlockNumber, blockNumber)
}
e.mu.Lock()
defer e.mu.Unlock()
if e.ReferenceBlockNumber < blockNumber {
// Wipe out the encoding results based on previous reference block number
_ = e.EncodedBlobstore.PopLatestEncodingResults(e.ReferenceBlockNumber)
}
e.ReferenceBlockNumber = blockNumber
return nil
}

func (e *EncodingStreamer) CreateMinibatch(ctx context.Context) (*batch, error) {
e.mu.Lock()
defer e.mu.Unlock()
// Cancel outstanding encoding requests
// Assumption: `CreateMinibatch` will be called at an interval longer than time it takes to encode a single blob
if len(e.encodingCtxCancelFuncs) > 0 {
e.logger.Info("canceling outstanding encoding requests", "count", len(e.encodingCtxCancelFuncs))
for _, cancel := range e.encodingCtxCancelFuncs {
cancel()
}
e.encodingCtxCancelFuncs = make([]context.CancelFunc, 0)
}

// Pop the latest encoded blobs and delete any stale results that are not from the current batching iteration (i.e. that has different reference block number)
// If any pending encoded results are discarded here, it will be re-requested in the next iteration
encodedResults := e.EncodedBlobstore.PopLatestEncodingResults(e.ReferenceBlockNumber)

e.logger.Info("creating a batch...", "numBlobs", len(encodedResults), "refblockNumber", e.ReferenceBlockNumber)
if len(encodedResults) == 0 {
return nil, errNoEncodedResults
}

encodedBlobByKey := make(map[disperser.BlobKey]core.EncodedBlob)
blobQuorums := make(map[disperser.BlobKey][]*core.BlobQuorumInfo)
blobHeaderByKey := make(map[disperser.BlobKey]*core.BlobHeader)
metadataByKey := make(map[disperser.BlobKey]*disperser.BlobMetadata)
for i := range encodedResults {
// each result represent an encoded result per (blob, quorum param)
// if the same blob has been dispersed multiple time with different security params,
// there will be multiple encoded results for that (blob, quorum)
result := encodedResults[i]
blobKey := result.BlobMetadata.GetBlobKey()
if _, ok := encodedBlobByKey[blobKey]; !ok {
metadataByKey[blobKey] = result.BlobMetadata
blobQuorums[blobKey] = make([]*core.BlobQuorumInfo, 0)
blobHeader := &core.BlobHeader{
BlobCommitments: *result.Commitment,
}
blobHeaderByKey[blobKey] = blobHeader
encodedBlobByKey[blobKey] = core.EncodedBlob{
BlobHeader: blobHeader,
BundlesByOperator: make(map[core.OperatorID]core.Bundles),
}
}

// Populate the assigned bundles
for opID, assignment := range result.Assignments {
bundles, ok := encodedBlobByKey[blobKey].BundlesByOperator[opID]
if !ok {
encodedBlobByKey[blobKey].BundlesByOperator[opID] = make(core.Bundles)
bundles = encodedBlobByKey[blobKey].BundlesByOperator[opID]
}
bundles[result.BlobQuorumInfo.QuorumID] = append(bundles[result.BlobQuorumInfo.QuorumID], result.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...)
}

blobQuorums[blobKey] = append(blobQuorums[blobKey], result.BlobQuorumInfo)
}

// Populate the blob quorum infos
for blobKey, encodedBlob := range encodedBlobByKey {
encodedBlob.BlobHeader.QuorumInfos = blobQuorums[blobKey]
}

for blobKey, metadata := range metadataByKey {
quorumPresent := make(map[core.QuorumID]bool)
for _, quorum := range blobQuorums[blobKey] {
quorumPresent[quorum.QuorumID] = true
}
// Check if the blob has valid quorums. If any of the quorums are not valid, delete the blobKey
for _, quorum := range metadata.RequestMetadata.SecurityParams {
_, ok := quorumPresent[quorum.QuorumID]
if !ok {
// Delete the blobKey. These encoded blobs will be automatically removed by the next run of
// RequestEncoding
delete(metadataByKey, blobKey)
break
}
}
}

if len(metadataByKey) == 0 {
return nil, errNoEncodedResults
}

// Transform maps to slices so orders in different slices match
encodedBlobs := make([]core.EncodedBlob, len(metadataByKey))
blobHeaders := make([]*core.BlobHeader, len(metadataByKey))
metadatas := make([]*disperser.BlobMetadata, len(metadataByKey))
i := 0
for key := range metadataByKey {
err := e.transitionBlobToDispersing(ctx, metadataByKey[key])
if err != nil {
continue
}
encodedBlobs[i] = encodedBlobByKey[key]
blobHeaders[i] = blobHeaderByKey[key]
metadatas[i] = metadataByKey[key]
i++
}

timeoutCtx, cancel := context.WithTimeout(context.Background(), e.ChainStateTimeout)
defer cancel()

state, err := e.getOperatorState(timeoutCtx, metadatas, e.ReferenceBlockNumber)
if err != nil {
return nil, err
}

// Populate the batch header
batchHeader := &core.BatchHeader{
ReferenceBlockNumber: e.ReferenceBlockNumber,
BatchRoot: [32]byte{},
}

_, err = batchHeader.SetBatchRoot(blobHeaders)
if err != nil {
return nil, err
}

return &batch{
EncodedBlobs: encodedBlobs,
BatchHeader: batchHeader,
BlobHeaders: blobHeaders,
BlobMetadata: metadatas,
State: state,
}, nil
}

// CreateBatch makes a batch from all blobs in the encoded blob store.
// If successful, it returns a batch, and updates the reference block number for next batch to use.
// Otherwise, it returns an error and keeps the blobs in the encoded blob store.
Expand Down
168 changes: 168 additions & 0 deletions disperser/batcher/encoding_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,3 +749,171 @@ func TestGetBatch(t *testing.T) {
assert.Contains(t, batch.BlobMetadata, metadata1)
assert.Contains(t, batch.BlobMetadata, metadata2)
}

func TestCreateMinibatch(t *testing.T) {
encodingStreamer, c := createEncodingStreamer(t, 10, 1e12, streamerConfig)
ctx := context.Background()

// put 2 blobs in the blobstore
blob1 := makeTestBlob([]*core.SecurityParam{{
QuorumID: 0,
AdversaryThreshold: 80,
ConfirmationThreshold: 100,
}, {
QuorumID: 1,
AdversaryThreshold: 70,
ConfirmationThreshold: 95,
}})
blob2 := makeTestBlob([]*core.SecurityParam{{
QuorumID: 2,
AdversaryThreshold: 75,
ConfirmationThreshold: 100,
}})
metadataKey1, err := c.blobStore.StoreBlob(ctx, &blob1, uint64(time.Now().UnixNano()))
assert.Nil(t, err)
metadata1, err := c.blobStore.GetBlobMetadata(ctx, metadataKey1)
assert.Nil(t, err)
assert.Equal(t, disperser.Processing, metadata1.BlobStatus)
metadataKey2, err := c.blobStore.StoreBlob(ctx, &blob2, uint64(time.Now().UnixNano()))
assert.Nil(t, err)
metadata2, err := c.blobStore.GetBlobMetadata(ctx, metadataKey2)
assert.Nil(t, err)
assert.Equal(t, disperser.Processing, metadata2.BlobStatus)

// request encoding
out := make(chan batcher.EncodingResultOrStatus)
err = encodingStreamer.RequestEncoding(context.Background(), out)
assert.Nil(t, err)
isRequested := encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey1, core.QuorumID(0), 10)
assert.True(t, isRequested)
isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey1, core.QuorumID(1), 10)
assert.True(t, isRequested)
isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey2, core.QuorumID(2), 10)
assert.True(t, isRequested)

err = encodingStreamer.ProcessEncodedBlobs(context.Background(), <-out)
assert.Nil(t, err)
err = encodingStreamer.ProcessEncodedBlobs(context.Background(), <-out)
assert.Nil(t, err)
err = encodingStreamer.ProcessEncodedBlobs(context.Background(), <-out)
assert.Nil(t, err)
encodingStreamer.Pool.StopWait()

isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey1, core.QuorumID(0), 10)
assert.True(t, isRequested)
isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey1, core.QuorumID(1), 10)
assert.True(t, isRequested)
isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey2, core.QuorumID(2), 10)
assert.True(t, isRequested)

// get batch
assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(10))
batch, err := encodingStreamer.CreateMinibatch(context.Background())
assert.Nil(t, err)
assert.NotNil(t, batch)
assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(10))
metadata1, err = c.blobStore.GetBlobMetadata(ctx, metadataKey1)
assert.Nil(t, err)
assert.Equal(t, disperser.Dispersing, metadata1.BlobStatus)
metadata2, err = c.blobStore.GetBlobMetadata(ctx, metadataKey2)
assert.Equal(t, disperser.Dispersing, metadata2.BlobStatus)
assert.Nil(t, err)
res, err := encodingStreamer.EncodedBlobstore.GetEncodingResult(metadataKey1, core.QuorumID(0))
assert.Nil(t, res)
assert.ErrorContains(t, err, "GetEncodedBlob: no such key")
res, err = encodingStreamer.EncodedBlobstore.GetEncodingResult(metadataKey1, core.QuorumID(1))
assert.Nil(t, res)
assert.ErrorContains(t, err, "GetEncodedBlob: no such key")
res, err = encodingStreamer.EncodedBlobstore.GetEncodingResult(metadataKey2, core.QuorumID(0))
assert.Nil(t, res)
assert.ErrorContains(t, err, "GetEncodedBlob: no such key")

err = encodingStreamer.UpdateReferenecBlock(15)
assert.Nil(t, err)
assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(15))

// Check BatchHeader
assert.NotNil(t, batch.BatchHeader)
assert.Greater(t, len(batch.BatchHeader.BatchRoot), 0)
assert.Equal(t, batch.BatchHeader.ReferenceBlockNumber, uint(10))

// Check State
assert.NotNil(t, batch.State)

// Check EncodedBlobs
assert.Len(t, batch.EncodedBlobs, 2)
assert.Len(t, batch.EncodedBlobs[0].BundlesByOperator, numOperators)

var encodedBlob1 core.EncodedBlob
var encodedBlob2 core.EncodedBlob
for i := range batch.BlobHeaders {
blobHeader := batch.BlobHeaders[i]
if len(blobHeader.QuorumInfos) > 1 {
encodedBlob1 = batch.EncodedBlobs[i]
// batch.EncodedBlobs and batch.BlobMetadata should be in the same order
assert.ElementsMatch(t, batch.BlobMetadata[i].RequestMetadata.SecurityParams, blob1.RequestHeader.SecurityParams)
} else {
encodedBlob2 = batch.EncodedBlobs[i]
assert.ElementsMatch(t, batch.BlobMetadata[i].RequestMetadata.SecurityParams, blob2.RequestHeader.SecurityParams)
}
}
assert.NotNil(t, encodedBlob1)
assert.NotNil(t, encodedBlob2)

assert.NotNil(t, encodedBlob1.BlobHeader)
assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments)
assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments.Commitment)
assert.NotNil(t, encodedBlob1.BlobHeader.BlobCommitments.LengthProof)
assert.Equal(t, encodedBlob1.BlobHeader.BlobCommitments.Length, uint(48))
assert.Len(t, encodedBlob1.BlobHeader.QuorumInfos, 2)
assert.ElementsMatch(t, encodedBlob1.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{
{
SecurityParam: core.SecurityParam{
QuorumID: 0,
AdversaryThreshold: 80,
ConfirmationThreshold: 100,
},
ChunkLength: 16,
},
{
SecurityParam: core.SecurityParam{
QuorumID: 1,
AdversaryThreshold: 70,
ConfirmationThreshold: 95,
},
ChunkLength: 8,
},
})

assert.Contains(t, batch.BlobHeaders, encodedBlob1.BlobHeader)
for _, bundles := range encodedBlob1.BundlesByOperator {
assert.Len(t, bundles, 2)
assert.Greater(t, len(bundles[0]), 0)
assert.Greater(t, len(bundles[1]), 0)
break
}

assert.NotNil(t, encodedBlob2.BlobHeader)
assert.NotNil(t, encodedBlob2.BlobHeader.BlobCommitments)
assert.NotNil(t, encodedBlob2.BlobHeader.BlobCommitments.Commitment)
assert.NotNil(t, encodedBlob2.BlobHeader.BlobCommitments.LengthProof)
assert.Equal(t, encodedBlob2.BlobHeader.BlobCommitments.Length, uint(48))
assert.Len(t, encodedBlob2.BlobHeader.QuorumInfos, 1)
assert.ElementsMatch(t, encodedBlob2.BlobHeader.QuorumInfos, []*core.BlobQuorumInfo{{
SecurityParam: core.SecurityParam{
QuorumID: 2,
AdversaryThreshold: 75,
ConfirmationThreshold: 100,
},
ChunkLength: 8,
}})
for _, bundles := range encodedBlob2.BundlesByOperator {
assert.Len(t, bundles, 1)
assert.Greater(t, len(bundles[core.QuorumID(2)]), 0)
break
}
assert.Len(t, batch.BlobHeaders, 2)
assert.Len(t, batch.BlobMetadata, 2)
assert.Contains(t, batch.BlobMetadata, metadata1)
assert.Contains(t, batch.BlobMetadata, metadata2)
}

0 comments on commit fdae989

Please sign in to comment.