Skip to content

Commit

Permalink
Incremental progress.
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley committed Jul 18, 2024
1 parent 8191b7f commit fff7c76
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 3 deletions.
54 changes: 54 additions & 0 deletions tools/traffic/blob_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package traffic

// BlobMetadata encapsulates various information about a blob written by the traffic generator.
type BlobMetadata struct {
// key of the blob, set when the blob is initially uploaded.
key *[]byte

// batchHeaderHash of the blob.
batchHeaderHash *[]byte

// blobIndex of the blob.
blobIndex uint32

// remainingReadPermits describes the maximum number of remaining reads permitted against this blob.
// If -1 then an unlimited number of reads are permitted.
remainingReadPermits int32

// index describes the position of this blob within the blobTable.
index uint32
}

// NewBlobMetadata creates a new BlobMetadata instance. The readPermits parameter describes the maximum number of
// remaining reads permitted against this blob. If -1 then an unlimited number of reads are permitted.
func NewBlobMetadata(
key *[]byte,
batchHeaderHash *[]byte,
blobIndex uint32,
readPermits int32) *BlobMetadata {

return &BlobMetadata{
key: key,
batchHeaderHash: batchHeaderHash,
blobIndex: blobIndex,
remainingReadPermits: readPermits,
index: 0,
}
}

// Key returns the key of the blob.
func (blob *BlobMetadata) Key() *[]byte {
return blob.key
}

// BatchHeaderHash returns the batchHeaderHash of the blob.
func (blob *BlobMetadata) BatchHeaderHash() *[]byte {
return blob.batchHeaderHash
}

// BlobIndex returns the blobIndex of the blob.
func (blob *BlobMetadata) BlobIndex() uint32 {
return blob.blobIndex
}

// TODO method for decrementing read permits
91 changes: 91 additions & 0 deletions tools/traffic/blob_table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package traffic

import (
"fmt"
"math/rand"
"sync"
)

// BlobTable tracks blobs written by the traffic generator. This is a thread safe data structure.
type BlobTable struct {

// blobs contains all blobs currently tracked by the table.
blobs []*BlobMetadata

// size describes the total number of blobs currently tracked by the table.
// size may be smaller than the capacity of the blobs slice.
size uint32

// lock is used to synchronize access to the table.
lock sync.Mutex
}

// NewBlobTable creates a new BlobTable instance.
func NewBlobTable() BlobTable {
return BlobTable{
blobs: make([]*BlobMetadata, 1024),
size: 0,
}
}

// Size returns the total number of blobs currently tracked by the table.
func (table *BlobTable) Size() uint32 {
table.lock.Lock()
defer table.lock.Unlock()

return table.size
}

// Add a blob to the table.
func (table *BlobTable) Add(blob *BlobMetadata) {
table.lock.Lock()
defer table.lock.Unlock()

if table.size == uint32(len(table.blobs)) {
panic(fmt.Sprintf("blob table is full, cannot add blob %x", blob.Key))
}

blob.index = table.size
table.blobs[table.size] = blob
table.size++
}

// GetRandom returns a random blob currently tracked by the table. Returns nil if the table is empty.
// Optionally decrements the read permits of the blob if decrement is true. If the number of read permits
// reaches 0, the blob is removed from the table.
func (table *BlobTable) GetRandom(decrement bool) *BlobMetadata {
table.lock.Lock()
defer table.lock.Unlock()

if table.size == 0 {
return nil
}

blob := table.blobs[rand.Int31n(int32(table.size))] // TODO make sure we can get items if we overflow an int32

if decrement && blob.remainingReadPermits != -1 {
blob.remainingReadPermits--
if blob.remainingReadPermits == 0 {
table.remove(blob)
}
}

return blob
}

// remove a blob from the table.
func (table *BlobTable) remove(blob *BlobMetadata) {
if table.blobs[blob.index] != blob {
panic(fmt.Sprintf("blob %x is not not present in the table at index %d", blob.Key, blob.index))
}

if table.size == 1 {
table.blobs[0] = nil
} else {
// Move the last blob to the position of the blob being removed.
table.blobs[blob.index] = table.blobs[table.size-1]
table.blobs[blob.index].index = blob.index
table.blobs[table.size-1] = nil
}
table.size--
}
2 changes: 1 addition & 1 deletion tools/traffic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Config struct {
NumWriteInstances uint
// The period of the submission rate of new blobs for each write worker thread.
WriteRequestInterval time.Duration
// The size of each blob dispersed, in bytes.
// The Size of each blob dispersed, in bytes.
DataSize uint64
// If true, then each blob will contain unique random data. If false, the same random data
// will be dispersed for each blob by a particular worker thread.
Expand Down
15 changes: 13 additions & 2 deletions tools/traffic/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"os"
"os/signal"
"sync"
Expand Down Expand Up @@ -87,6 +88,11 @@ func (g *TrafficGenerator) StartWriteWorker(ctx context.Context) error {
return err
}

// TODO configuration for this stuff
var table BlobTable = NewBlobTable()
var verifier StatusVerifier = NewStatusVerifier(&table, &g.DisperserClient, -1)
verifier.Start(ctx, time.Second)

paddedData := codec.ConvertByPaddingEmptyByte(data)

ticker := time.NewTicker(g.Config.WriteRequestInterval)
Expand All @@ -95,25 +101,30 @@ func (g *TrafficGenerator) StartWriteWorker(ctx context.Context) error {
case <-ctx.Done():
return nil
case <-ticker.C:
var key []byte
if g.Config.RandomizeBlobs {
_, err := rand.Read(data)
if err != nil {
return err
}
paddedData = codec.ConvertByPaddingEmptyByte(data)

_, err = g.sendRequest(ctx, paddedData[:g.Config.DataSize])
key, err = g.sendRequest(ctx, paddedData[:g.Config.DataSize])

if err != nil {
g.Logger.Error("failed to send blob request", "err:", err)
}
paddedData = nil
} else {
_, err = g.sendRequest(ctx, paddedData[:g.Config.DataSize])
key, err = g.sendRequest(ctx, paddedData[:g.Config.DataSize])
if err != nil {
g.Logger.Error("failed to send blob request", "err:", err)
}
}

fmt.Println("passing key to verifier") // TODO remove
verifier.AddUnconfirmedKey(&key)
fmt.Println("done passing key") // TODO remove
}
}
}
Expand Down
119 changes: 119 additions & 0 deletions tools/traffic/status_verifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package traffic

import (
"context"
"fmt"
"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/api/grpc/disperser"
"time"
)

// StatusVerifier periodically polls the disperser service to verify the status of blobs that were recently written.
// When blobs become confirmed, the status verifier updates the blob table accordingly.
// This is a thread safe data structure.
type StatusVerifier struct {

// A table of confirmed blobs. Blobs are added here when they are confirmed by the disperser service.
table *BlobTable

// The maximum number of reads permitted against an individual blob, or -1 if unlimited.
blobReadLimit int32

// The disperser client used to monitor the disperser service.
dispenser *clients.DisperserClient

// The keys of blobs that have not yet been confirmed by the disperser service.
unconfirmedKeys []*[]byte

// Newly added keys that require verification.
keyChannel chan *[]byte
}

// NewStatusVerifier creates a new StatusVerifier instance.
func NewStatusVerifier(
table *BlobTable,
disperser *clients.DisperserClient,
blobReadLimit int32) StatusVerifier {

return StatusVerifier{
table: table,
blobReadLimit: blobReadLimit,
dispenser: disperser,
unconfirmedKeys: make([]*[]byte, 0),
keyChannel: make(chan *[]byte),
}
}

// AddUnconfirmedKey adds a key to the list of unconfirmed keys.
func (verifier *StatusVerifier) AddUnconfirmedKey(key *[]byte) {
fmt.Println("Adding unconfirmed key") // TODO remove
verifier.keyChannel <- key
fmt.Println("Finished adding unconfirmed key") // TODO remove
}

// Start begins the status goroutine, which periodically polls
// the disperser service to verify the status of blobs.
func (verifier *StatusVerifier) Start(ctx context.Context, period time.Duration) {
go verifier.monitor(ctx, period)
}

// monitor periodically polls the disperser service to verify the status of blobs.
func (verifier *StatusVerifier) monitor(ctx context.Context, period time.Duration) {
fmt.Println("::: Starting status verifier :::") // TODO remove

ticker := time.NewTicker(period)
for {
select {
case <-ctx.Done():
return
case key := <-verifier.keyChannel:
fmt.Println("Got unconfirmed key") // TODO remove
verifier.unconfirmedKeys = append(verifier.unconfirmedKeys, key)
case <-ticker.C:
fmt.Println("polling") // TODO remove
verifier.poll(ctx)
fmt.Println("done polling") // TODO remove
}
}
}

// poll checks all unconfirmed keys to see if they have been confirmed by the disperser service.
// If a key is confirmed, it is added to the blob table and removed from the list of unconfirmed keys.
func (verifier *StatusVerifier) poll(ctx context.Context) {
unconfirmedKeys := make([]*[]byte, 0)
for _, key := range verifier.unconfirmedKeys {
confirmed := verifier.checkStatusForBlob(ctx, key)
if !confirmed {
unconfirmedKeys = append(unconfirmedKeys, key)
}
}
verifier.unconfirmedKeys = unconfirmedKeys
}

// checkStatusForBlob checks the status of a blob. Returns true if the blob is confirmed, false otherwise.
func (verifier *StatusVerifier) checkStatusForBlob(ctx context.Context, key *[]byte) bool {
status, err := (*verifier.dispenser).GetBlobStatus(ctx, *key)

if err != nil {
fmt.Println("Error getting blob status:", err) // TODO is this proper?
return false
}

// TODO other statuses?
if status.GetStatus() == disperser.BlobStatus_CONFIRMED {

fmt.Println(">>>>>>>>>>>>>>>>>>>>>> Confirmed key", key) // TODO remove

batchHeaderHash := status.GetInfo().BlobVerificationProof.BatchMetadata.BatchHeaderHash
blobIndex := status.GetInfo().BlobVerificationProof.GetBlobIndex()

blobMetadata := NewBlobMetadata(key, &batchHeaderHash, blobIndex, -1) // TODO permits
verifier.table.Add(blobMetadata)

return true
} else {
fmt.Println("-------------- key not yet confirmed") // TODO remove
}

return false
}

0 comments on commit fff7c76

Please sign in to comment.