From 3089bb0ac72ef919fe911eb22d9f2c9ac8b57faf Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 18 Jul 2024 15:41:06 -0500 Subject: [PATCH] Refactor blob writer. Signed-off-by: Cody Littley --- tools/traffic/blob_reader.go | 1 + tools/traffic/blob_writer.go | 124 ++++++++++++++++++++++++++++++++ tools/traffic/generator.go | 90 +++-------------------- tools/traffic/generator_test.go | 4 +- 4 files changed, 135 insertions(+), 84 deletions(-) create mode 100644 tools/traffic/blob_reader.go create mode 100644 tools/traffic/blob_writer.go diff --git a/tools/traffic/blob_reader.go b/tools/traffic/blob_reader.go new file mode 100644 index 000000000..afb546e82 --- /dev/null +++ b/tools/traffic/blob_reader.go @@ -0,0 +1 @@ +package traffic diff --git a/tools/traffic/blob_writer.go b/tools/traffic/blob_writer.go new file mode 100644 index 000000000..a35a177e0 --- /dev/null +++ b/tools/traffic/blob_writer.go @@ -0,0 +1,124 @@ +package traffic + +import ( + "context" + "crypto/rand" + "encoding/hex" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "sync" + "time" +) + +// TODO document + +// BlobWriter sends blobs to a disperser at a configured rate. +type BlobWriter struct { + ctx *context.Context + waitGroup *sync.WaitGroup + generator *TrafficGenerator + verifier *StatusVerifier + + // fixedRandomData contains random data for blobs if RandomizeBlobs is false, and nil otherwise. + fixedRandomData *[]byte +} + +// NewBlobWriter creates a new BlobWriter instance. +func NewBlobWriter( + ctx *context.Context, + waitGroup *sync.WaitGroup, + generator *TrafficGenerator, + verifier *StatusVerifier) BlobWriter { + + var fixedRandomData []byte + if generator.Config.RandomizeBlobs { + // New random data will be generated for each blob. + fixedRandomData = nil + } else { + // Use this random data for each blob. + fixedRandomData := make([]byte, generator.Config.DataSize) + _, err := rand.Read(fixedRandomData) + if err != nil { + panic(err) + } + fixedRandomData = codec.ConvertByPaddingEmptyByte(fixedRandomData) + } + + return BlobWriter{ + ctx: ctx, + waitGroup: waitGroup, + generator: generator, + verifier: verifier, + fixedRandomData: &fixedRandomData, + } +} + +// Start begins the blob writer goroutine. +func (writer *BlobWriter) Start() { + writer.waitGroup.Add(1) + go func() { + writer.run() + writer.waitGroup.Done() + }() +} + +// run sends blobs to a disperser at a configured rate. +// Continues and dues not return until the context is cancelled. +func (writer *BlobWriter) run() { + ticker := time.NewTicker(writer.generator.Config.WriteRequestInterval) + for { + select { + case <-(*writer.ctx).Done(): + return + case <-ticker.C: + key, err := writer.sendRequest(*writer.getRandomData()) + + if err != nil { + writer.generator.Logger.Error("failed to send blob request", "err:", err) + continue + } + + writer.verifier.AddUnconfirmedKey(&key) + } + } +} + +// getRandomData returns a slice of random data to be used for a blob. +func (writer *BlobWriter) getRandomData() *[]byte { + if *writer.fixedRandomData != nil { + return writer.fixedRandomData + } + + data := make([]byte, writer.generator.Config.DataSize) + _, err := rand.Read(data) + if err != nil { + panic(err) + } + data = codec.ConvertByPaddingEmptyByte(data) + + return &data +} + +// sendRequest sends a blob to a disperser. +func (writer *BlobWriter) sendRequest(data []byte) ([]byte /* key */, error) { + ctxTimeout, cancel := context.WithTimeout(*writer.ctx, writer.generator.Config.Timeout) + defer cancel() + + if writer.generator.Config.SignerPrivateKey != "" { + blobStatus, key, err := + writer.generator.DisperserClient.DisperseBlobAuthenticated(ctxTimeout, data, writer.generator.Config.CustomQuorums) + if err != nil { + return nil, err + } + + writer.generator.Logger.Info("successfully dispersed new blob", "authenticated", true, "key", hex.EncodeToString(key), "status", blobStatus.String()) + return key, nil + } else { + blobStatus, key, err := writer.generator.DisperserClient.DisperseBlob(ctxTimeout, data, writer.generator.Config.CustomQuorums) + if err != nil { + return nil, err + } + + writer.generator.Logger.Info("successfully dispersed new blob", "authenticated", false, "key", hex.EncodeToString(key), "status", blobStatus.String()) + return key, nil + } +} diff --git a/tools/traffic/generator.go b/tools/traffic/generator.go index 780899f5a..88779ed63 100644 --- a/tools/traffic/generator.go +++ b/tools/traffic/generator.go @@ -2,9 +2,6 @@ package traffic import ( "context" - "crypto/rand" - "encoding/hex" - "fmt" "os" "os/signal" "sync" @@ -14,7 +11,6 @@ import ( "github.com/Layr-Labs/eigenda/api/clients" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/core" - "github.com/Layr-Labs/eigenda/encoding/utils/codec" "github.com/Layr-Labs/eigensdk-go/logging" ) @@ -41,13 +37,16 @@ func NewTrafficGenerator(config *Config, signer core.BlobRequestSigner) (*Traffi // Run instantiates goroutines that generate read/write traffic, continues until a SIGTERM is observed. func (g *TrafficGenerator) Run() error { ctx, cancel := context.WithCancel(context.Background()) + + // TODO add configuration + table := NewBlobTable() + verifier := NewStatusVerifier(&table, &g.DisperserClient, -1) + verifier.Start(ctx, time.Second) + var wg sync.WaitGroup for i := 0; i < int(g.Config.NumWriteInstances); i++ { - wg.Add(1) - go func() { - defer wg.Done() - _ = g.StartWriteWorker(ctx) - }() + writer := NewBlobWriter(&ctx, &wg, g, &verifier) + writer.Start() time.Sleep(g.Config.InstanceLaunchInterval) } signals := make(chan os.Signal, 1) @@ -79,76 +78,3 @@ func (g *TrafficGenerator) StartReadWorker(ctx context.Context) error { func (g *TrafficGenerator) readRequest() { // TODO } - -// StartWriteWorker periodically sends (possibly) random blobs to a disperser at a configured rate. -func (g *TrafficGenerator) StartWriteWorker(ctx context.Context) error { - data := make([]byte, g.Config.DataSize) - _, err := rand.Read(data) - if err != nil { - return err - } - - // TODO configuration for this stuff - var table BlobTable = NewBlobTable() - var verifier StatusVerifier = NewStatusVerifier(&table, &g.DisperserClient, -1) - verifier.Start(ctx, time.Second) - - paddedData := codec.ConvertByPaddingEmptyByte(data) - - ticker := time.NewTicker(g.Config.WriteRequestInterval) - for { - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - var key []byte - if g.Config.RandomizeBlobs { - _, err := rand.Read(data) - if err != nil { - return err - } - paddedData = codec.ConvertByPaddingEmptyByte(data) - - key, err = g.sendRequest(ctx, paddedData[:g.Config.DataSize]) - - if err != nil { - g.Logger.Error("failed to send blob request", "err:", err) - } - paddedData = nil - } else { - key, err = g.sendRequest(ctx, paddedData[:g.Config.DataSize]) - if err != nil { - g.Logger.Error("failed to send blob request", "err:", err) - } - } - - fmt.Println("passing key to verifier") // TODO remove - verifier.AddUnconfirmedKey(&key) - fmt.Println("done passing key") // TODO remove - } - } -} - -// sendRequest sends a blob to a disperser. -func (g *TrafficGenerator) sendRequest(ctx context.Context, data []byte) ([]byte /* key */, error) { - ctxTimeout, cancel := context.WithTimeout(ctx, g.Config.Timeout) - defer cancel() - - if g.Config.SignerPrivateKey != "" { - blobStatus, key, err := g.DisperserClient.DisperseBlobAuthenticated(ctxTimeout, data, g.Config.CustomQuorums) - if err != nil { - return nil, err - } - - g.Logger.Info("successfully dispersed new blob", "authenticated", true, "key", hex.EncodeToString(key), "status", blobStatus.String()) - return key, nil - } else { - blobStatus, key, err := g.DisperserClient.DisperseBlob(ctxTimeout, data, g.Config.CustomQuorums) - if err != nil { - return nil, err - } - - g.Logger.Info("successfully dispersed new blob", "authenticated", false, "key", hex.EncodeToString(key), "status", blobStatus.String()) - return key, nil - } -} diff --git a/tools/traffic/generator_test.go b/tools/traffic/generator_test.go index b2ef6abb1..05983e9b3 100644 --- a/tools/traffic/generator_test.go +++ b/tools/traffic/generator_test.go @@ -34,7 +34,7 @@ func TestTrafficGenerator(t *testing.T) { Return(&processing, []byte{1}, nil) ctx, cancel := context.WithCancel(context.Background()) go func() { - _ = trafficGenerator.StartWriteWorker(ctx) + _ = trafficGenerator.StartBlobWriter(ctx) }() time.Sleep(5 * time.Second) cancel() @@ -63,7 +63,7 @@ func TestTrafficGeneratorAuthenticated(t *testing.T) { Return(&processing, []byte{1}, nil) ctx, cancel := context.WithCancel(context.Background()) go func() { - _ = trafficGenerator.StartWriteWorker(ctx) + _ = trafficGenerator.StartBlobWriter(ctx) }() time.Sleep(5 * time.Second) cancel()