diff --git a/tools/traffic/blob_reader.go b/tools/traffic/blob_reader.go index afb546e82..0be931a29 100644 --- a/tools/traffic/blob_reader.go +++ b/tools/traffic/blob_reader.go @@ -1 +1,81 @@ package traffic + +import ( + "context" + "fmt" + "sync" + "time" +) + +// BlobReader reads blobs from a disperser at a configured rate. +type BlobReader struct { + // The context for the generator. All work should cease when this context is cancelled. + ctx *context.Context + + // Tracks the number of active goroutines within the generator. + waitGroup *sync.WaitGroup + + // TODO this type should be refactored maybe + generator *TrafficGenerator + + // table of blobs to read from. + table *BlobTable +} + +// NewBlobReader creates a new BlobReader instance. +func NewBlobReader( + ctx *context.Context, + waitGroup *sync.WaitGroup, + generator *TrafficGenerator, + table *BlobTable) BlobReader { + + return BlobReader{ + ctx: ctx, + waitGroup: waitGroup, + generator: generator, + table: table, + } +} + +// Start begins a blob reader goroutine. +func (reader *BlobReader) Start() { + reader.waitGroup.Add(1) + go func() { + defer reader.waitGroup.Done() + reader.run() + }() +} + +// run periodically performs reads on blobs. +func (reader *BlobReader) run() { + ticker := time.NewTicker(time.Second) // TODO setting + for { + select { + case <-(*reader.ctx).Done(): + return + case <-ticker.C: + reader.randomRead() + } + } +} + +// randomRead reads a random blob. +func (reader *BlobReader) randomRead() { + + metadata := reader.table.GetRandom(true) + if metadata == nil { + // There are no blobs to read, do nothing. + return + } + + // TODO convert this to a proper read + data, err := reader.generator.DisperserClient.RetrieveBlob(*reader.ctx, *metadata.batchHeaderHash, metadata.blobIndex) + if err != nil { + fmt.Println("Error reading blob:", err) // TODO + return + } + + // TODO it would be nice to do some verification, perhaps just of the hash of the blob + + fmt.Println("Read blob:", data) // TODO +} diff --git a/tools/traffic/blob_writer.go b/tools/traffic/blob_writer.go index a35a177e0..ab2e0d842 100644 --- a/tools/traffic/blob_writer.go +++ b/tools/traffic/blob_writer.go @@ -13,10 +13,17 @@ import ( // BlobWriter sends blobs to a disperser at a configured rate. type BlobWriter struct { - ctx *context.Context + // The context for the generator. All work should cease when this context is cancelled. + ctx *context.Context + + // Tracks the number of active goroutines within the generator. waitGroup *sync.WaitGroup + + // TODO this type should be refactored maybe generator *TrafficGenerator - verifier *StatusVerifier + + // Responsible for polling on the status of a recently written blob until it becomes confirmed. + verifier *StatusVerifier // fixedRandomData contains random data for blobs if RandomizeBlobs is false, and nil otherwise. fixedRandomData *[]byte diff --git a/tools/traffic/generator.go b/tools/traffic/generator.go index 88779ed63..85b073d33 100644 --- a/tools/traffic/generator.go +++ b/tools/traffic/generator.go @@ -14,6 +14,22 @@ import ( "github.com/Layr-Labs/eigensdk-go/logging" ) +// TrafficGenerator simulates read/write traffic to the DA service. +// +// ┌------------┐ ┌------------┐ +// | writer |-┐ ┌------------┐ | reader |-┐ +// └------------┘ |-┐ -------> | verifier | -------> └------------┘ |-┐ +// └------------┘ | └------------┘ └------------┘ | +// └------------┘ └------------┘ +// +// The traffic generator is built from three principal components: one or more writers +// that write blobs, a verifier that polls the dispenser service until blobs are confirmed, +// and one or more readers that read blobs. +// +// When a writer finishes writing a blob, it +// sends information about that blob to the verifier. When the verifier observes that a blob +// has been confirmed, it sends information about the blob to the readers. The readers +// only attempt to read blobs that have been confirmed by the verifier. type TrafficGenerator struct { Logger logging.Logger DisperserClient clients.DisperserClient @@ -44,11 +60,17 @@ func (g *TrafficGenerator) Run() error { verifier.Start(ctx, time.Second) var wg sync.WaitGroup + for i := 0; i < int(g.Config.NumWriteInstances); i++ { writer := NewBlobWriter(&ctx, &wg, g, &verifier) writer.Start() time.Sleep(g.Config.InstanceLaunchInterval) } + + // TODO start multiple readers + reader := NewBlobReader(&ctx, &wg, g, &table) + reader.Start() + signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt, syscall.SIGTERM) <-signals @@ -57,24 +79,3 @@ func (g *TrafficGenerator) Run() error { wg.Wait() return nil } - -// TODO maybe split reader/writer into separate files - -// StartReadWorker periodically requests to download random blobs at a configured rate. -func (g *TrafficGenerator) StartReadWorker(ctx context.Context) error { - ticker := time.NewTicker(g.Config.WriteRequestInterval) - for { - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - // TODO determine which blob to download - g.readRequest() // TODO add parameters - } - } -} - -// readRequest reads a blob. -func (g *TrafficGenerator) readRequest() { - // TODO -} diff --git a/tools/traffic/status_verifier.go b/tools/traffic/status_verifier.go index 9ce9ba424..aa4709549 100644 --- a/tools/traffic/status_verifier.go +++ b/tools/traffic/status_verifier.go @@ -46,9 +46,7 @@ func NewStatusVerifier( // AddUnconfirmedKey adds a key to the list of unconfirmed keys. func (verifier *StatusVerifier) AddUnconfirmedKey(key *[]byte) { - fmt.Println("Adding unconfirmed key") // TODO remove verifier.keyChannel <- key - fmt.Println("Finished adding unconfirmed key") // TODO remove } // Start begins the status goroutine, which periodically polls @@ -59,20 +57,15 @@ func (verifier *StatusVerifier) Start(ctx context.Context, period time.Duration) // monitor periodically polls the disperser service to verify the status of blobs. func (verifier *StatusVerifier) monitor(ctx context.Context, period time.Duration) { - fmt.Println("::: Starting status verifier :::") // TODO remove - ticker := time.NewTicker(period) for { select { case <-ctx.Done(): return case key := <-verifier.keyChannel: - fmt.Println("Got unconfirmed key") // TODO remove verifier.unconfirmedKeys = append(verifier.unconfirmedKeys, key) case <-ticker.C: - fmt.Println("polling") // TODO remove verifier.poll(ctx) - fmt.Println("done polling") // TODO remove } } } @@ -99,7 +92,7 @@ func (verifier *StatusVerifier) checkStatusForBlob(ctx context.Context, key *[]b return false } - // TODO other statuses? + // TODO other statuses if status.GetStatus() == disperser.BlobStatus_CONFIRMED { fmt.Println(">>>>>>>>>>>>>>>>>>>>>> Confirmed key", key) // TODO remove