Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Layr-Labs/eigenda into ch…
Browse files Browse the repository at this point in the history
…unkencodingdisperser
  • Loading branch information
jianoaix committed Jul 19, 2024
2 parents 9fd42a3 + 5b1fbad commit ce28116
Show file tree
Hide file tree
Showing 14 changed files with 1,369 additions and 224 deletions.
761 changes: 539 additions & 222 deletions api/grpc/node/node.pb.go

Large diffs are not rendered by default.

84 changes: 84 additions & 0 deletions api/grpc/node/node_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions api/proto/node/node.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
syntax = "proto3";
package node;
import "google/protobuf/wrappers.proto";
import "common/common.proto";
option go_package = "github.com/Layr-Labs/eigenda/api/grpc/node";

Expand All @@ -14,6 +15,13 @@ service Dispersal {
// for the protocol-defined length of custody. It will return a signature at the
// end to attest to the data in this request it has processed.
rpc StoreChunks(StoreChunksRequest) returns (StoreChunksReply) {}
// StoreBlobs is simiar to StoreChunks, but it stores the blobs using a different storage schema
// so that the stored blobs can later be aggregated by AttestBatch method to a bigger batch.
// StoreBlobs + AttestBatch will eventually replace and deprecate StoreChunks method.
rpc StoreBlobs(StoreBlobsRequest) returns (StoreBlobsReply) {}
// AttestBatch is used to aggregate the batches stored by StoreBlobs method to a bigger batch.
// It will return a signature at the end to attest to the aggregated batch.
rpc AttestBatch(AttestBatchRequest) returns (AttestBatchReply) {}
// Retrieve node info metadata
rpc NodeInfo(NodeInfoRequest) returns (NodeInfoReply) {}
}
Expand Down Expand Up @@ -41,6 +49,31 @@ message StoreChunksReply {
bytes signature = 1;
}

message StoreBlobsRequest {
// Blobs to store
repeated Blob blobs = 1;
// The reference block number whose state is used to encode the blobs
uint32 reference_block_number = 2;
}

message StoreBlobsReply {
// The operator's BLS sgnature signed on the blob header hashes.
// The ordering of the signatures must match the ordering of the blobs sent
// in the request, with empty signatures in the places for discarded blobs.
repeated google.protobuf.BytesValue signatures = 1;
}

message AttestBatchRequest {
// header of the batch
BatchHeader batch_header = 1;
// the header hashes of all blobs in the batch
repeated bytes blob_header_hashes = 2;
}

message AttestBatchReply {
bytes signature = 1;
}

message RetrieveChunksRequest {
// The hash of the ReducedBatchHeader defined onchain, see:
// https://github.com/Layr-Labs/eigenda/blob/master/contracts/src/interfaces/IEigenDAServiceManager.sol#L43
Expand Down Expand Up @@ -149,6 +182,8 @@ message BlobHeader {
repeated BlobQuorumInfo quorum_headers = 5;
// The ID of the user who is dispersing this blob to EigenDA.
string account_id = 6;
// The reference block number whose state is used to encode the blob
uint32 reference_block_number = 7;
}

// See BlobQuorumParam as defined in
Expand Down
26 changes: 26 additions & 0 deletions disperser/batcher/encoded_blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,32 @@ func (e *encodedBlobStore) DeleteEncodingResult(blobKey disperser.BlobKey, quoru
e.encodedResultSize -= getChunksSize(encodedResult)
}

// PopLatestEncodingResults returns all the encoded results that are pending dispersal and deletes them along with stale results that are older than the given reference block
func (e *encodedBlobStore) PopLatestEncodingResults(refBlockNumber uint) []*EncodingResult {
e.mu.Lock()
defer e.mu.Unlock()

fetched := make([]*EncodingResult, 0)
staleCount := 0
for k, encodedResult := range e.encoded {
if encodedResult.ReferenceBlockNumber == refBlockNumber {
fetched = append(fetched, encodedResult)
// this is safe: https://go.dev/doc/effective_go#for
delete(e.encoded, k)
e.encodedResultSize -= getChunksSize(encodedResult)
} else if encodedResult.ReferenceBlockNumber < refBlockNumber {
delete(e.encoded, k)
staleCount++
e.encodedResultSize -= getChunksSize(encodedResult)
} else {
e.logger.Error("unexpected case", "refBlockNumber", encodedResult.ReferenceBlockNumber, "refBlockNumber", refBlockNumber)
}
}
e.logger.Debug("consumed encoded results", "fetched", len(fetched), "stale", staleCount, "refBlockNumber", refBlockNumber, "encodedSize", e.encodedResultSize)

return fetched
}

// GetNewAndDeleteStaleEncodingResults returns all the fresh encoded results that are pending dispersal, and deletes all the stale results that are older than the given block number
func (e *encodedBlobStore) GetNewAndDeleteStaleEncodingResults(blockNumber uint) []*EncodingResult {
e.mu.Lock()
Expand Down
148 changes: 146 additions & 2 deletions disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,7 @@ func (e *EncodingStreamer) RequestEncodingForBlob(ctx context.Context, metadata
}
})
e.EncodedBlobstore.PutEncodingRequest(blobKey, res.BlobQuorumInfo.QuorumID)

}

}

func (e *EncodingStreamer) ProcessEncodedBlobs(ctx context.Context, result EncodingResultOrStatus) error {
Expand Down Expand Up @@ -420,6 +418,152 @@ func (e *EncodingStreamer) ProcessEncodedBlobs(ctx context.Context, result Encod
return nil
}

func (e *EncodingStreamer) UpdateReferenecBlock(currentBlockNumber uint) error {
blockNumber := currentBlockNumber
if blockNumber > e.FinalizationBlockDelay {
blockNumber -= e.FinalizationBlockDelay
}
if e.ReferenceBlockNumber > blockNumber {
return fmt.Errorf("reference block number is being updated to a lower value: from %d to %d", e.ReferenceBlockNumber, blockNumber)
}
e.mu.Lock()
defer e.mu.Unlock()
if e.ReferenceBlockNumber < blockNumber {
// Wipe out the encoding results based on previous reference block number
_ = e.EncodedBlobstore.PopLatestEncodingResults(e.ReferenceBlockNumber)
}
e.ReferenceBlockNumber = blockNumber
return nil
}

func (e *EncodingStreamer) CreateMinibatch(ctx context.Context) (*batch, error) {
e.mu.Lock()
defer e.mu.Unlock()
// Cancel outstanding encoding requests
// Assumption: `CreateMinibatch` will be called at an interval longer than time it takes to encode a single blob
if len(e.encodingCtxCancelFuncs) > 0 {
e.logger.Info("canceling outstanding encoding requests", "count", len(e.encodingCtxCancelFuncs))
for _, cancel := range e.encodingCtxCancelFuncs {
cancel()
}
e.encodingCtxCancelFuncs = make([]context.CancelFunc, 0)
}

// Pop the latest encoded blobs and delete any stale results that are not from the current batching iteration (i.e. that has different reference block number)
// If any pending encoded results are discarded here, it will be re-requested in the next iteration
encodedResults := e.EncodedBlobstore.PopLatestEncodingResults(e.ReferenceBlockNumber)

e.logger.Info("creating a batch...", "numBlobs", len(encodedResults), "refblockNumber", e.ReferenceBlockNumber)
if len(encodedResults) == 0 {
return nil, errNoEncodedResults
}

encodedBlobByKey := make(map[disperser.BlobKey]core.EncodedBlob)
blobQuorums := make(map[disperser.BlobKey][]*core.BlobQuorumInfo)
blobHeaderByKey := make(map[disperser.BlobKey]*core.BlobHeader)
metadataByKey := make(map[disperser.BlobKey]*disperser.BlobMetadata)
for i := range encodedResults {
// each result represent an encoded result per (blob, quorum param)
// if the same blob has been dispersed multiple time with different security params,
// there will be multiple encoded results for that (blob, quorum)
result := encodedResults[i]
blobKey := result.BlobMetadata.GetBlobKey()
if _, ok := encodedBlobByKey[blobKey]; !ok {
metadataByKey[blobKey] = result.BlobMetadata
blobQuorums[blobKey] = make([]*core.BlobQuorumInfo, 0)
blobHeader := &core.BlobHeader{
BlobCommitments: *result.Commitment,
}
blobHeaderByKey[blobKey] = blobHeader
encodedBlobByKey[blobKey] = core.EncodedBlob{
BlobHeader: blobHeader,
BundlesByOperator: make(map[core.OperatorID]core.Bundles),
}
}

// Populate the assigned bundles
for opID, assignment := range result.Assignments {
bundles, ok := encodedBlobByKey[blobKey].BundlesByOperator[opID]
if !ok {
encodedBlobByKey[blobKey].BundlesByOperator[opID] = make(core.Bundles)
bundles = encodedBlobByKey[blobKey].BundlesByOperator[opID]
}
bundles[result.BlobQuorumInfo.QuorumID] = append(bundles[result.BlobQuorumInfo.QuorumID], result.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...)
}

blobQuorums[blobKey] = append(blobQuorums[blobKey], result.BlobQuorumInfo)
}

// Populate the blob quorum infos
for blobKey, encodedBlob := range encodedBlobByKey {
encodedBlob.BlobHeader.QuorumInfos = blobQuorums[blobKey]
}

for blobKey, metadata := range metadataByKey {
quorumPresent := make(map[core.QuorumID]bool)
for _, quorum := range blobQuorums[blobKey] {
quorumPresent[quorum.QuorumID] = true
}
// Check if the blob has valid quorums. If any of the quorums are not valid, delete the blobKey
for _, quorum := range metadata.RequestMetadata.SecurityParams {
_, ok := quorumPresent[quorum.QuorumID]
if !ok {
// Delete the blobKey. These encoded blobs will be automatically removed by the next run of
// RequestEncoding
delete(metadataByKey, blobKey)
break
}
}
}

if len(metadataByKey) == 0 {
return nil, errNoEncodedResults
}

// Transform maps to slices so orders in different slices match
encodedBlobs := make([]core.EncodedBlob, len(metadataByKey))
blobHeaders := make([]*core.BlobHeader, len(metadataByKey))
metadatas := make([]*disperser.BlobMetadata, len(metadataByKey))
i := 0
for key := range metadataByKey {
err := e.transitionBlobToDispersing(ctx, metadataByKey[key])
if err != nil {
continue
}
encodedBlobs[i] = encodedBlobByKey[key]
blobHeaders[i] = blobHeaderByKey[key]
metadatas[i] = metadataByKey[key]
i++
}

timeoutCtx, cancel := context.WithTimeout(context.Background(), e.ChainStateTimeout)
defer cancel()

state, err := e.getOperatorState(timeoutCtx, metadatas, e.ReferenceBlockNumber)
if err != nil {
return nil, err
}

// Populate the batch header
batchHeader := &core.BatchHeader{
ReferenceBlockNumber: e.ReferenceBlockNumber,
BatchRoot: [32]byte{},
}

_, err = batchHeader.SetBatchRoot(blobHeaders)
if err != nil {
return nil, err
}

return &batch{
EncodedBlobs: encodedBlobs,
BatchHeader: batchHeader,
BlobHeaders: blobHeaders,
BlobMetadata: metadatas,
State: state,
}, nil
}

// CreateBatch makes a batch from all blobs in the encoded blob store.
// If successful, it returns a batch, and updates the reference block number for next batch to use.
// Otherwise, it returns an error and keeps the blobs in the encoded blob store.
Expand Down
Loading

0 comments on commit ce28116

Please sign in to comment.