diff --git a/api/clients/mock/retrieval_client.go b/api/clients/mock/retrieval_client.go index 5dfd326083..3c54612573 100644 --- a/api/clients/mock/retrieval_client.go +++ b/api/clients/mock/retrieval_client.go @@ -44,7 +44,7 @@ func (c *MockRetrievalClient) RetrieveBlobChunks( batchRoot [32]byte, quorumID core.QuorumID) (*clients.BlobChunks, error) { - return nil, nil // TODO + return nil, nil } func (c *MockRetrievalClient) CombineChunks(chunks *clients.BlobChunks) ([]byte, error) { diff --git a/tools/traffic/blob_metadata.go b/tools/traffic/blob_metadata.go index d3e07ec455..2d71749210 100644 --- a/tools/traffic/blob_metadata.go +++ b/tools/traffic/blob_metadata.go @@ -8,6 +8,9 @@ type BlobMetadata struct { // checksum of the blob. checksum *[16]byte + // batchHeaderHash of the blob in bytes. + size uint + // batchHeaderHash of the blob. batchHeaderHash *[]byte @@ -27,6 +30,7 @@ type BlobMetadata struct { func NewBlobMetadata( key *[]byte, checksum *[16]byte, + size uint, batchHeaderHash *[]byte, blobIndex uint32, readPermits int32) *BlobMetadata { @@ -34,6 +38,7 @@ func NewBlobMetadata( return &BlobMetadata{ key: key, checksum: checksum, + size: size, batchHeaderHash: batchHeaderHash, blobIndex: blobIndex, remainingReadPermits: readPermits, diff --git a/tools/traffic/blob_reader.go b/tools/traffic/blob_reader.go index e1ea38c323..ad051492b7 100644 --- a/tools/traffic/blob_reader.go +++ b/tools/traffic/blob_reader.go @@ -8,7 +8,6 @@ import ( contractEigenDAServiceManager "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" - "github.com/Layr-Labs/eigenda/encoding/utils/codec" "github.com/Layr-Labs/eigenda/retriever/eth" "github.com/Layr-Labs/eigensdk-go/logging" gcommon "github.com/ethereum/go-ethereum/common" @@ -188,7 +187,7 @@ func (reader *BlobReader) randomRead() { } reader.recombinationSuccessMetric.Increment() - reader.verifyBlob(metadata, data) + reader.verifyBlob(metadata, &data) indexSet := make(map[encoding.ChunkNumber]bool) for index := range chunks.Indices { @@ -229,10 +228,10 @@ func (reader *BlobReader) reportMissingChunk(operatorId core.OperatorID) { } // verifyBlob performs sanity checks on the blob. -func (reader *BlobReader) verifyBlob(metadata *BlobMetadata, blob []byte) { - recomputedChecksum := md5.Sum(codec.RemoveEmptyByteFromPaddedBytes(blob)) - - fmt.Printf("metadata.checksum: %x, recomputed checksum: %x\n", *metadata.checksum, recomputedChecksum) +func (reader *BlobReader) verifyBlob(metadata *BlobMetadata, blob *[]byte) { + // Trim off the padding. + truncatedBlob := (*blob)[:metadata.size] + recomputedChecksum := md5.Sum(truncatedBlob) if *metadata.checksum == recomputedChecksum { reader.validBlobMetric.Increment() diff --git a/tools/traffic/blob_verifier.go b/tools/traffic/blob_verifier.go index 3954011df6..b3fbc1c50f 100644 --- a/tools/traffic/blob_verifier.go +++ b/tools/traffic/blob_verifier.go @@ -14,6 +14,8 @@ import ( type unconfirmedKey struct { // The key of the blob. key *[]byte + // The size of the blob in bytes. + size uint // The checksum of the blob. checksum *[16]byte // The time the blob was submitted to the disperser service. @@ -96,10 +98,11 @@ func NewStatusVerifier( } // AddUnconfirmedKey adds a key to the list of unconfirmed keys. -func (verifier *BlobVerifier) AddUnconfirmedKey(key *[]byte, checksum *[16]byte) { +func (verifier *BlobVerifier) AddUnconfirmedKey(key *[]byte, checksum *[16]byte, size uint) { verifier.keyChannel <- &unconfirmedKey{ key: key, checksum: checksum, + size: size, submissionTime: time.Now(), } } @@ -131,7 +134,8 @@ func (verifier *BlobVerifier) monitor(period time.Duration) { // If a key is confirmed, it is added to the blob table and removed from the list of unconfirmed keys. func (verifier *BlobVerifier) poll() { - // TODO If the number of unconfirmed blobs is high and the time to confirm his high, this is not efficient. + // FUTURE WORK If the number of unconfirmed blobs is high and the time to confirm is high, this is not efficient. + // Revisit this method if there are performance problems. unconfirmedKeys := make([]*unconfirmedKey, 0) for _, key := range verifier.unconfirmedKeys { @@ -226,6 +230,6 @@ func (verifier *BlobVerifier) forwardToReader(key *unconfirmedKey, status *dispe downloadCount = int32(requiredDownloads) } - blobMetadata := NewBlobMetadata(key.key, key.checksum, &batchHeaderHash, blobIndex, downloadCount) + blobMetadata := NewBlobMetadata(key.key, key.checksum, key.size, &batchHeaderHash, blobIndex, downloadCount) verifier.table.Add(blobMetadata) } diff --git a/tools/traffic/blob_writer.go b/tools/traffic/blob_writer.go index 004723ee0b..05b4c5aa53 100644 --- a/tools/traffic/blob_writer.go +++ b/tools/traffic/blob_writer.go @@ -113,8 +113,10 @@ func (writer *BlobWriter) run() { writer.writeSuccessMetric.Increment() + fmt.Printf(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data size = %d\n", len(*data)) + checksum := md5.Sum(*data) - writer.verifier.AddUnconfirmedKey(&key, &checksum) + writer.verifier.AddUnconfirmedKey(&key, &checksum, uint(len(*data))) } } }