Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added blob table data structure. #677

Merged
merged 8 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions tools/traffic/table/blob_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package table

// BlobMetadata encapsulates various information about a blob written by the traffic generator.
type BlobMetadata struct {
// key of the blob, set when the blob is initially uploaded.
key []byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the blobKey:

type BlobKey struct {
?

Copy link
Contributor Author

@cody-littley cody-littley Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key, in this context, is the []byte return value of this method:

func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) {

The BlobKey seems like it is holding the same data, although it is in string form, not byte array form. I originally had the MetadataHash field as well, although that was removed based on a prior comment.

Should I be using the BlobKey struct here?


// checksum of the blob.
checksum *[16]byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be a pointer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to a non-pointer variable.


// size of the blob, in bytes.
size uint

// blobIndex of the blob.
blobIndex uint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is blob index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blob index is one of the arguments needed when retrieving a blob. I am unsure of the deeper meaning of this field, I figured out I needed it through reverse engineering.

func (r *retrievalClient) RetrieveBlob(
	ctx context.Context,
	batchHeaderHash [32]byte,
	blobIndex uint32,  👈
	referenceBlockNumber uint,
	batchRoot [32]byte,
	quorumID core.QuorumID) ([]byte, error) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this index is the the position of the blob in the batch (the batch is a list of blobs)


// remainingReadPermits describes the maximum number of remaining reads permitted against this blob.
// If -1 then an unlimited number of reads are permitted.
remainingReadPermits int

// index describes the position of this blob within the blobTable.
index uint
}

// NewBlobMetadata creates a new BlobMetadata instance. The readPermits parameter describes the maximum number of
// remaining reads permitted against this blob. If -1 then an unlimited number of reads are permitted.
func NewBlobMetadata(
key []byte,
checksum *[16]byte,
size uint,
blobIndex uint,
readPermits int) *BlobMetadata {

if readPermits == 0 {
panic("readPermits must be greater than 0, or -1 for unlimited reads")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could return error instead of crashing when validation fails

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change made

}

return &BlobMetadata{
key: key,
checksum: checksum,
size: size,
blobIndex: blobIndex,
remainingReadPermits: readPermits,
index: 0,
}
}

// Key returns the key of the blob.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about making the member variables public? These getters are quite simple so seem not worth the verbosity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intention was to make it possible to read these values without the capability of updating them. But maybe that's more of a java design pattern. I've made the member variables public as you suggest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it feels quite Java-ish

func (blob *BlobMetadata) Key() []byte {
return blob.key
}

// Checksum returns the checksum of the blob.
func (blob *BlobMetadata) Checksum() *[16]byte {
return blob.checksum
}

// Size returns the size of the blob, in bytes.
func (blob *BlobMetadata) Size() uint {
return blob.size
}

// BlobIndex returns the blobIndex of the blob.
func (blob *BlobMetadata) BlobIndex() uint {
return blob.blobIndex
}

// RemainingReadPermits returns the maximum number of remaining reads permitted against this blob.
func (blob *BlobMetadata) RemainingReadPermits() int {
return blob.remainingReadPermits
}
140 changes: 140 additions & 0 deletions tools/traffic/table/blob_table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package table

import (
"fmt"
"math/rand"
"sync"
)

// BlobTable tracks blobs written by the traffic generator. This is a thread safe data structure.
type BlobTable struct {

// blobs contains all blobs currently tracked by the requiredReads.
blobs []*BlobMetadata

// size describes the total number of blobs currently tracked by the requiredReads.
// size may be smaller than the capacity of the blobs slice.
size uint

// lock is used to synchronize access to the requiredReads.
lock sync.RWMutex
}

// NewBlobTable creates a new BlobTable instance.
func NewBlobTable() BlobTable {
return BlobTable{
blobs: make([]*BlobMetadata, 0),
size: 0,
}
}

// Size returns the total number of blobs currently tracked by the requiredReads.
func (table *BlobTable) Size() uint {
table.lock.RLock()
defer table.lock.RUnlock()

return table.size
}

// Get returns the blob at the specified index. Returns nil if the index is out of bounds.
func (table *BlobTable) Get(index uint) *BlobMetadata {
table.lock.RLock()
defer table.lock.RUnlock()

if index >= table.size {
return nil
}

return table.blobs[index]
}

// Add a blob to the requiredReads.
func (table *BlobTable) Add(blob *BlobMetadata) {
table.lock.Lock()
defer table.lock.Unlock()

blob.index = table.size

if table.size == uint(len(table.blobs)) {
table.blobs = append(table.blobs, blob)
} else {
table.blobs[table.size] = blob
}

table.size++
}

// AddOrReplace is equivalent to Add if there is capacity, or replaces an existing blob at random
// if the is no remaining capacity. This method is a no-op if maximumCapacity is 0.
func (table *BlobTable) AddOrReplace(blob *BlobMetadata, maximumCapacity uint) {
if maximumCapacity == 0 {
return
}

table.lock.Lock()
defer table.lock.Unlock()

if table.size >= maximumCapacity {
// replace random existing blob
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the existing blob hasn't been retrieved the required number of times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it is removed.

This is not a problem for our use case though, since we keep two blob tables: one for required blobs, and another for optional blobs. The code never calls AddOrReplace() on the table that contains the required blobs.

index := rand.Int31n(int32(table.size))
table.blobs[index] = blob
blob.index = uint(index)
} else {
// add new blob
blob.index = table.size
if table.size == uint(len(table.blobs)) {
table.blobs = append(table.blobs, blob)
} else {
table.blobs[table.size] = blob
}
table.size++
}
}

// GetRandom returns a random blob currently tracked by the requiredReads. Returns nil if the requiredReads is empty.
// Optionally decrements the read permits of the blob if decrement is true. If the number of read permits
// reaches 0, the blob is removed from the requiredReads. Returns the blob metadata (if there is at least one blob
// in the table) and a boolean indicating whether the blob was removed from the table as a result of this operation.
func (table *BlobTable) GetRandom(decrement bool) (*BlobMetadata, bool) {
if decrement {
table.lock.Lock()
defer table.lock.Unlock()
} else {
table.lock.RLock()
defer table.lock.RUnlock()
}

if table.size == 0 {
return nil, false
}

blob := table.blobs[rand.Int31n(int32(table.size))]

removed := false
if decrement && blob.remainingReadPermits != -1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the condition be blob.remainingReadPermits != -1?
Say blob.remainingReadPermits is 0 before this call. Then it gets decremented to -1 inside this block and is never removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read permits for blobs in the table are never 0. They are always -1 or greater than 0, and removed the moment the count reaches 0.

Just in case, I added an assertion in the NewBlobMetadata() method to validate that this invariant is not violated.

	if readPermits == 0 {
		panic("readPermits must be greater than 0, or -1 for unlimited reads")
	}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh got it

blob.remainingReadPermits--
if blob.remainingReadPermits == 0 {
table.remove(blob)
removed = true
}
}

return blob, removed
}

// remove a blob from the requiredReads.
func (table *BlobTable) remove(blob *BlobMetadata) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a particular reason why we need table to be a slice?
I feel like removing/adding elements would be a lot simpler and less brittle if we used a map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Primary reason why I use a slice is to give an O(1) implementation of GetRandom(). Can you think of a good way to do this backed by a map?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to access a random element from the slice?
For sampling, could we just use the first element from the map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire reason why this complex data structure exists in the first place is to facilitate random access. 😜

For the blobs with a number of required reads: yes, we could get away without random access. But I'm under the impression a random access pattern is preferable to a fixed one when simulating workloads like this.

For the pool of optional blobs to read, I think random access is necessary. Otherwise, we'd just be reading the same blob over and over until we get a new blob to start reading.

I'm open to discussing this more in depth if you are not convinced by my reasoning.

Copy link
Contributor

@ian-shim ian-shim Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the blobs with a number of required reads: yes, we could get away without random access. But I'm under the impression a random access pattern is preferable to a fixed one when simulating workloads like this.

We could make get semi random access with a map in constant time (just generate random number n < 10 and pick _n_th element from map) if the access doesn't require sampling from uniform distribution.

For the pool of optional blobs to read, I think random access is necessary. Otherwise, we'd just be reading the same blob over and over until we get a new blob to start reading.

I don't think optional blob reads were ever part of the spec. The primary goal for this observability tool is to monitor if the network can handle a given retrieval traffic. Are there benefits of saturating the network with optional reads?

Not a big deal since you have it implemented already, but would bias toward simplicity vs. optimization

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to use a map based implementation.

if table.blobs[blob.index] != blob {
panic(fmt.Sprintf("blob %x is not not present in the requiredReads at index %d", blob.Key(), blob.index))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log at error level and handle this case gracefully vs. crashing the whole program?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was replaced with the simpler implementation.

}

if table.size == 1 {
table.blobs[0] = nil
} else {
// Move the last blob to the position of the blob being removed.
table.blobs[blob.index] = table.blobs[table.size-1]
table.blobs[blob.index].index = blob.index
table.blobs[table.size-1] = nil
}
table.size--
}
175 changes: 175 additions & 0 deletions tools/traffic/table/blob_table_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package table

import (
tu "github.com/Layr-Labs/eigenda/common/testutils"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/rand"
"testing"
)

// randomMetadata generates a random BlobMetadata instance.
func randomMetadata(permits int) *BlobMetadata {
key := make([]byte, 32)
checksum := [16]byte{}
_, _ = rand.Read(key)
_, _ = rand.Read(checksum[:])
return NewBlobMetadata(key, &checksum, 1024, 0, permits)
}

// TestBasicOperation tests basic operations of the BlobTable. Adds blobs and iterates over them.
func TestBasicOperation(t *testing.T) {
tu.InitializeRandom()

table := NewBlobTable()
assert.Equal(t, uint(0), table.Size())

size := 1024
expectedMetadata := make([]*BlobMetadata, 0)
for i := 0; i < size; i++ {
metadata := randomMetadata(1)
table.Add(metadata)
expectedMetadata = append(expectedMetadata, metadata)
assert.Equal(t, uint(i+1), table.Size())
}

for i := 0; i < size; i++ {
assert.Equal(t, expectedMetadata[i], table.Get(uint(i)))
}

// Requesting an index that is out of bounds should return nil.
assert.Nil(t, table.Get(uint(size)))
}

// TestGetRandomWithRemoval tests getting a random blob data, but where the number of permits per blob is unlimited.
func TestGetRandomNoRemovalByConfiguration(t *testing.T) {
tu.InitializeRandom()

table := NewBlobTable()
assert.Equal(t, uint(0), table.Size())

// Requesting a random element from an empty table should return nil.
element, _ := table.GetRandom(true)
assert.Nil(t, element)

expectedMetadata := make([]*BlobMetadata, 0)
size := 128
for i := 0; i < size; i++ {
metadata := randomMetadata(-1) // -1 == unlimited permits
table.Add(metadata)
expectedMetadata = append(expectedMetadata, metadata)
assert.Equal(t, uint(i+1), table.Size())
}

randomIndices := make(map[uint]bool)

// Query more times than the number of blobs to ensure that blobs are not removed.
for i := 0; i < size*8; i++ {
// This parameter will be ignored given that the number of permits is unlimited.
// But not a bad thing to exercise the code path.
decrement := rand.Intn(2) == 1

metadata, removed := table.GetRandom(decrement)
assert.False(t, removed)
assert.NotNil(t, metadata)
assert.Equal(t, expectedMetadata[metadata.index], metadata)

randomIndices[metadata.index] = true
}

// Sanity check: ensure that at least 10 different blobs were returned. This check is attempting to verify
// that we are actually getting random blobs. The probability of this check failing is extremely low if
// the random number generator is working correctly.
assert.GreaterOrEqual(t, len(randomIndices), 10)
}

// TestGetRandomWithRemoval tests getting a random blob data, where the number of permits per blob is limited.
func TestGetRandomWithRemoval(t *testing.T) {
tu.InitializeRandom()

table := NewBlobTable()
assert.Equal(t, uint(0), table.Size())

// Requesting a random element from an empty table should return nil.
element, _ := table.GetRandom(true)
assert.Nil(t, element)

permitCount := 2

size := 1024
expectedMetadata := make(map[string]uint)
for i := 0; i < size; i++ {
metadata := randomMetadata(permitCount)
table.Add(metadata)
expectedMetadata[string(metadata.Key())] = 0
assert.Equal(t, uint(i+1), table.Size())
}

// Requesting random elements without decrementing should not remove any elements.
for i := 0; i < size; i++ {
metadata, removed := table.GetRandom(false)
assert.NotNil(t, metadata)
_, exists := expectedMetadata[string(metadata.Key())]
assert.True(t, exists)
assert.False(t, removed)
}
assert.Equal(t, uint(size), table.Size())

// Requesting elements a number of times equal to the size times the number of permits should completely
// drain the table and return all elements a number of times equal to the number of permits.
for i := 0; i < size*permitCount; i++ {
metadata, removed := table.GetRandom(true)
assert.NotNil(t, metadata)

k := string(metadata.Key())
permitsUsed := expectedMetadata[k] + 1
expectedMetadata[k] = permitsUsed
assert.LessOrEqual(t, permitsUsed, uint(permitCount))

if int(permitsUsed) == permitCount {
assert.True(t, removed)
} else {
assert.False(t, removed)
}
}

assert.Equal(t, uint(0), table.Size())
}

// TestAddOrReplace tests adding blobs to a table with a maximum capacity. The table should replace blobs when full.
func TestAddOrReplace(t *testing.T) {
tu.InitializeRandom()

table := NewBlobTable()
assert.Equal(t, uint(0), table.Size())

// Adding data to a table with capacity 0 should be a no-op.
table.AddOrReplace(randomMetadata(1), 0)
assert.Equal(t, uint(0), table.Size())

randomIndices := make(map[uint]bool)

size := 1024
for i := 0; i < size*2; i++ {
metadata := randomMetadata(-1) // -1 == unlimited permits

initialSize := table.Size()
table.AddOrReplace(metadata, uint(size))
resultingSize := table.Size()

assert.LessOrEqual(t, resultingSize, uint(size))
if initialSize < uint(size) {
assert.Equal(t, initialSize+1, resultingSize)
} else {
randomIndices[metadata.index] = true
}

// Verify that the metadata is in the table.
assert.Less(t, metadata.index, table.Size())
assert.Equal(t, metadata, table.Get(metadata.index))
}

// Sanity check: ensure that replacements happened at least 10 different indices. This check is attempting to
// verify that we are actually replacing blobs. The probability of this check failing is extremely low if
// the random number generator is working correctly.
assert.GreaterOrEqual(t, len(randomIndices), 10)
}
Loading