Skip to content

Commit

Permalink
Refactor blob writer.
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley committed Jul 18, 2024
1 parent fff7c76 commit 3089bb0
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 84 deletions.
1 change: 1 addition & 0 deletions tools/traffic/blob_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package traffic
124 changes: 124 additions & 0 deletions tools/traffic/blob_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package traffic

import (
"context"
"crypto/rand"
"encoding/hex"
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
"sync"
"time"
)

// TODO document

// BlobWriter sends blobs to a disperser at a configured rate.
type BlobWriter struct {
ctx *context.Context
waitGroup *sync.WaitGroup
generator *TrafficGenerator
verifier *StatusVerifier

// fixedRandomData contains random data for blobs if RandomizeBlobs is false, and nil otherwise.
fixedRandomData *[]byte
}

// NewBlobWriter creates a new BlobWriter instance.
func NewBlobWriter(
ctx *context.Context,
waitGroup *sync.WaitGroup,
generator *TrafficGenerator,
verifier *StatusVerifier) BlobWriter {

var fixedRandomData []byte
if generator.Config.RandomizeBlobs {
// New random data will be generated for each blob.
fixedRandomData = nil
} else {
// Use this random data for each blob.
fixedRandomData := make([]byte, generator.Config.DataSize)
_, err := rand.Read(fixedRandomData)
if err != nil {
panic(err)
}
fixedRandomData = codec.ConvertByPaddingEmptyByte(fixedRandomData)
}

return BlobWriter{
ctx: ctx,
waitGroup: waitGroup,
generator: generator,
verifier: verifier,
fixedRandomData: &fixedRandomData,
}
}

// Start begins the blob writer goroutine.
func (writer *BlobWriter) Start() {
writer.waitGroup.Add(1)
go func() {
writer.run()
writer.waitGroup.Done()
}()
}

// run sends blobs to a disperser at a configured rate.
// Continues and dues not return until the context is cancelled.
func (writer *BlobWriter) run() {
ticker := time.NewTicker(writer.generator.Config.WriteRequestInterval)
for {
select {
case <-(*writer.ctx).Done():
return
case <-ticker.C:
key, err := writer.sendRequest(*writer.getRandomData())

if err != nil {
writer.generator.Logger.Error("failed to send blob request", "err:", err)
continue
}

writer.verifier.AddUnconfirmedKey(&key)
}
}
}

// getRandomData returns a slice of random data to be used for a blob.
func (writer *BlobWriter) getRandomData() *[]byte {
if *writer.fixedRandomData != nil {
return writer.fixedRandomData
}

data := make([]byte, writer.generator.Config.DataSize)
_, err := rand.Read(data)
if err != nil {
panic(err)
}
data = codec.ConvertByPaddingEmptyByte(data)

return &data
}

// sendRequest sends a blob to a disperser.
func (writer *BlobWriter) sendRequest(data []byte) ([]byte /* key */, error) {
ctxTimeout, cancel := context.WithTimeout(*writer.ctx, writer.generator.Config.Timeout)
defer cancel()

if writer.generator.Config.SignerPrivateKey != "" {
blobStatus, key, err :=
writer.generator.DisperserClient.DisperseBlobAuthenticated(ctxTimeout, data, writer.generator.Config.CustomQuorums)
if err != nil {
return nil, err
}

writer.generator.Logger.Info("successfully dispersed new blob", "authenticated", true, "key", hex.EncodeToString(key), "status", blobStatus.String())
return key, nil
} else {
blobStatus, key, err := writer.generator.DisperserClient.DisperseBlob(ctxTimeout, data, writer.generator.Config.CustomQuorums)
if err != nil {
return nil, err
}

writer.generator.Logger.Info("successfully dispersed new blob", "authenticated", false, "key", hex.EncodeToString(key), "status", blobStatus.String())
return key, nil
}
}
90 changes: 8 additions & 82 deletions tools/traffic/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package traffic

import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"os"
"os/signal"
"sync"
Expand All @@ -14,7 +11,6 @@ import (
"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
"github.com/Layr-Labs/eigensdk-go/logging"
)

Expand All @@ -41,13 +37,16 @@ func NewTrafficGenerator(config *Config, signer core.BlobRequestSigner) (*Traffi
// Run instantiates goroutines that generate read/write traffic, continues until a SIGTERM is observed.
func (g *TrafficGenerator) Run() error {
ctx, cancel := context.WithCancel(context.Background())

// TODO add configuration
table := NewBlobTable()
verifier := NewStatusVerifier(&table, &g.DisperserClient, -1)
verifier.Start(ctx, time.Second)

var wg sync.WaitGroup
for i := 0; i < int(g.Config.NumWriteInstances); i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = g.StartWriteWorker(ctx)
}()
writer := NewBlobWriter(&ctx, &wg, g, &verifier)
writer.Start()
time.Sleep(g.Config.InstanceLaunchInterval)
}
signals := make(chan os.Signal, 1)
Expand Down Expand Up @@ -79,76 +78,3 @@ func (g *TrafficGenerator) StartReadWorker(ctx context.Context) error {
func (g *TrafficGenerator) readRequest() {
// TODO
}

// StartWriteWorker periodically sends (possibly) random blobs to a disperser at a configured rate.
func (g *TrafficGenerator) StartWriteWorker(ctx context.Context) error {
data := make([]byte, g.Config.DataSize)
_, err := rand.Read(data)
if err != nil {
return err
}

// TODO configuration for this stuff
var table BlobTable = NewBlobTable()
var verifier StatusVerifier = NewStatusVerifier(&table, &g.DisperserClient, -1)
verifier.Start(ctx, time.Second)

paddedData := codec.ConvertByPaddingEmptyByte(data)

ticker := time.NewTicker(g.Config.WriteRequestInterval)
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
var key []byte
if g.Config.RandomizeBlobs {
_, err := rand.Read(data)
if err != nil {
return err
}
paddedData = codec.ConvertByPaddingEmptyByte(data)

key, err = g.sendRequest(ctx, paddedData[:g.Config.DataSize])

if err != nil {
g.Logger.Error("failed to send blob request", "err:", err)
}
paddedData = nil
} else {
key, err = g.sendRequest(ctx, paddedData[:g.Config.DataSize])
if err != nil {
g.Logger.Error("failed to send blob request", "err:", err)
}
}

fmt.Println("passing key to verifier") // TODO remove
verifier.AddUnconfirmedKey(&key)
fmt.Println("done passing key") // TODO remove
}
}
}

// sendRequest sends a blob to a disperser.
func (g *TrafficGenerator) sendRequest(ctx context.Context, data []byte) ([]byte /* key */, error) {
ctxTimeout, cancel := context.WithTimeout(ctx, g.Config.Timeout)
defer cancel()

if g.Config.SignerPrivateKey != "" {
blobStatus, key, err := g.DisperserClient.DisperseBlobAuthenticated(ctxTimeout, data, g.Config.CustomQuorums)
if err != nil {
return nil, err
}

g.Logger.Info("successfully dispersed new blob", "authenticated", true, "key", hex.EncodeToString(key), "status", blobStatus.String())
return key, nil
} else {
blobStatus, key, err := g.DisperserClient.DisperseBlob(ctxTimeout, data, g.Config.CustomQuorums)
if err != nil {
return nil, err
}

g.Logger.Info("successfully dispersed new blob", "authenticated", false, "key", hex.EncodeToString(key), "status", blobStatus.String())
return key, nil
}
}
4 changes: 2 additions & 2 deletions tools/traffic/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestTrafficGenerator(t *testing.T) {
Return(&processing, []byte{1}, nil)
ctx, cancel := context.WithCancel(context.Background())
go func() {
_ = trafficGenerator.StartWriteWorker(ctx)
_ = trafficGenerator.StartBlobWriter(ctx)
}()
time.Sleep(5 * time.Second)
cancel()
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestTrafficGeneratorAuthenticated(t *testing.T) {
Return(&processing, []byte{1}, nil)
ctx, cancel := context.WithCancel(context.Background())
go func() {
_ = trafficGenerator.StartWriteWorker(ctx)
_ = trafficGenerator.StartBlobWriter(ctx)
}()
time.Sleep(5 * time.Second)
cancel()
Expand Down

0 comments on commit 3089bb0

Please sign in to comment.