Skip to content

Commit

Permalink
Add ability to read from disperser.
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley committed Jul 19, 2024
1 parent 3089bb0 commit 463f88f
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 31 deletions.
80 changes: 80 additions & 0 deletions tools/traffic/blob_reader.go
Original file line number Diff line number Diff line change
@@ -1 +1,81 @@
package traffic

import (
"context"
"fmt"
"sync"
"time"
)

// BlobReader reads blobs from a disperser at a configured rate.
type BlobReader struct {
// The context for the generator. All work should cease when this context is cancelled.
ctx *context.Context

// Tracks the number of active goroutines within the generator.
waitGroup *sync.WaitGroup

// TODO this type should be refactored maybe
generator *TrafficGenerator

// table of blobs to read from.
table *BlobTable
}

// NewBlobReader creates a new BlobReader instance.
func NewBlobReader(
ctx *context.Context,
waitGroup *sync.WaitGroup,
generator *TrafficGenerator,
table *BlobTable) BlobReader {

return BlobReader{
ctx: ctx,
waitGroup: waitGroup,
generator: generator,
table: table,
}
}

// Start begins a blob reader goroutine.
func (reader *BlobReader) Start() {
reader.waitGroup.Add(1)
go func() {
defer reader.waitGroup.Done()
reader.run()
}()
}

// run periodically performs reads on blobs.
func (reader *BlobReader) run() {
ticker := time.NewTicker(time.Second) // TODO setting
for {
select {
case <-(*reader.ctx).Done():
return
case <-ticker.C:
reader.randomRead()
}
}
}

// randomRead reads a random blob.
func (reader *BlobReader) randomRead() {

metadata := reader.table.GetRandom(true)
if metadata == nil {
// There are no blobs to read, do nothing.
return
}

// TODO convert this to a proper read
data, err := reader.generator.DisperserClient.RetrieveBlob(*reader.ctx, *metadata.batchHeaderHash, metadata.blobIndex)
if err != nil {
fmt.Println("Error reading blob:", err) // TODO
return
}

// TODO it would be nice to do some verification, perhaps just of the hash of the blob

fmt.Println("Read blob:", data) // TODO
}
11 changes: 9 additions & 2 deletions tools/traffic/blob_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,17 @@ import (

// BlobWriter sends blobs to a disperser at a configured rate.
type BlobWriter struct {
ctx *context.Context
// The context for the generator. All work should cease when this context is cancelled.
ctx *context.Context

// Tracks the number of active goroutines within the generator.
waitGroup *sync.WaitGroup

// TODO this type should be refactored maybe
generator *TrafficGenerator
verifier *StatusVerifier

// Responsible for polling on the status of a recently written blob until it becomes confirmed.
verifier *StatusVerifier

// fixedRandomData contains random data for blobs if RandomizeBlobs is false, and nil otherwise.
fixedRandomData *[]byte
Expand Down
43 changes: 22 additions & 21 deletions tools/traffic/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,22 @@ import (
"github.com/Layr-Labs/eigensdk-go/logging"
)

// TrafficGenerator simulates read/write traffic to the DA service.
//
// ┌------------┐ ┌------------┐
// | writer |-┐ ┌------------┐ | reader |-┐
// └------------┘ |-┐ -------> | verifier | -------> └------------┘ |-┐
// └------------┘ | └------------┘ └------------┘ |
// └------------┘ └------------┘
//
// The traffic generator is built from three principal components: one or more writers
// that write blobs, a verifier that polls the dispenser service until blobs are confirmed,
// and one or more readers that read blobs.
//
// When a writer finishes writing a blob, it
// sends information about that blob to the verifier. When the verifier observes that a blob
// has been confirmed, it sends information about the blob to the readers. The readers
// only attempt to read blobs that have been confirmed by the verifier.
type TrafficGenerator struct {
Logger logging.Logger
DisperserClient clients.DisperserClient
Expand Down Expand Up @@ -44,11 +60,17 @@ func (g *TrafficGenerator) Run() error {
verifier.Start(ctx, time.Second)

var wg sync.WaitGroup

for i := 0; i < int(g.Config.NumWriteInstances); i++ {
writer := NewBlobWriter(&ctx, &wg, g, &verifier)
writer.Start()
time.Sleep(g.Config.InstanceLaunchInterval)
}

// TODO start multiple readers
reader := NewBlobReader(&ctx, &wg, g, &table)
reader.Start()

signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
<-signals
Expand All @@ -57,24 +79,3 @@ func (g *TrafficGenerator) Run() error {
wg.Wait()
return nil
}

// TODO maybe split reader/writer into separate files

// StartReadWorker periodically requests to download random blobs at a configured rate.
func (g *TrafficGenerator) StartReadWorker(ctx context.Context) error {
ticker := time.NewTicker(g.Config.WriteRequestInterval)
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
// TODO determine which blob to download
g.readRequest() // TODO add parameters
}
}
}

// readRequest reads a blob.
func (g *TrafficGenerator) readRequest() {
// TODO
}
9 changes: 1 addition & 8 deletions tools/traffic/status_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ func NewStatusVerifier(

// AddUnconfirmedKey adds a key to the list of unconfirmed keys.
func (verifier *StatusVerifier) AddUnconfirmedKey(key *[]byte) {
fmt.Println("Adding unconfirmed key") // TODO remove
verifier.keyChannel <- key
fmt.Println("Finished adding unconfirmed key") // TODO remove
}

// Start begins the status goroutine, which periodically polls
Expand All @@ -59,20 +57,15 @@ func (verifier *StatusVerifier) Start(ctx context.Context, period time.Duration)

// monitor periodically polls the disperser service to verify the status of blobs.
func (verifier *StatusVerifier) monitor(ctx context.Context, period time.Duration) {
fmt.Println("::: Starting status verifier :::") // TODO remove

ticker := time.NewTicker(period)
for {
select {
case <-ctx.Done():
return
case key := <-verifier.keyChannel:
fmt.Println("Got unconfirmed key") // TODO remove
verifier.unconfirmedKeys = append(verifier.unconfirmedKeys, key)
case <-ticker.C:
fmt.Println("polling") // TODO remove
verifier.poll(ctx)
fmt.Println("done polling") // TODO remove
}
}
}
Expand All @@ -99,7 +92,7 @@ func (verifier *StatusVerifier) checkStatusForBlob(ctx context.Context, key *[]b
return false
}

// TODO other statuses?
// TODO other statuses
if status.GetStatus() == disperser.BlobStatus_CONFIRMED {

fmt.Println(">>>>>>>>>>>>>>>>>>>>>> Confirmed key", key) // TODO remove
Expand Down

0 comments on commit 463f88f

Please sign in to comment.