Skip to content

Commit

Permalink
Incremental progress.
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley committed Jul 17, 2024
1 parent af3ac59 commit 8191b7f
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 39 deletions.
8 changes: 5 additions & 3 deletions tools/traffic/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ build: clean

run: build
TRAFFIC_GENERATOR_HOSTNAME=localhost \
TRAFFIC_GENERATOR_GRPC_PORT=32001 \
TRAFFIC_GENERATOR_GRPC_PORT=32003 \
TRAFFIC_GENERATOR_TIMEOUT=10s \
TRAFFIC_GENERATOR_NUM_INSTANCES=1 \
TRAFFIC_GENERATOR_REQUEST_INTERVAL=1s \
TRAFFIC_GENERATOR_NUM_WRITE_INSTANCES=1 \
TRAFFIC_GENERATOR_WRITE_REQUEST_INTERVAL=1s \
TRAFFIC_GENERATOR_DATA_SIZE=1000 \
TRAFFIC_GENERATOR_RANDOMIZE_BLOBS=true \
./bin/server

# TODO do not merge change TRAFFIC_GENERATOR_GRPC_PORT=32003
27 changes: 20 additions & 7 deletions tools/traffic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,30 @@ import (
type Config struct {
clients.Config

// TODO add to flags.go
// The number of worker threads that generate read traffic.
NumReadInstances uint
// The period of the submission rate of read requests for each read worker thread.
ReadRequestInterval time.Duration
// For each blob, how many times should it be downloaded? If between 0.0 and 1.0, blob will be downloaded
// 0 or 1 times with the specified probability (e.g. 0.2 means each blob has a 20% chance of being downloaded).
// If greater than 1.0, then each blob will be downloaded the specified number of times.
DownloadRate float64
// The minimum amount of time that must pass after a blob is written prior to the first read attempt being made.
ReadDelay time.Duration

// The number of worker threads that generate write traffic.
NumInstances uint
// The period of the submission rate of new blobs for each worker thread.
RequestInterval time.Duration
NumWriteInstances uint
// The period of the submission rate of new blobs for each write worker thread.
WriteRequestInterval time.Duration
// The size of each blob dispersed, in bytes.
DataSize uint64
// Configures logging for the traffic generator.
LoggingConfig common.LoggerConfig
// If true, then each blob will contain unique random data. If false, the same random data
// will be dispersed for each blob by a particular worker thread.
RandomizeBlobs bool

// Configures logging for the traffic generator.
LoggingConfig common.LoggerConfig
// The amount of time to sleep after launching each worker thread.
InstanceLaunchInterval time.Duration

Expand Down Expand Up @@ -52,8 +65,8 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
ctx.Duration(flags.TimeoutFlag.Name),
ctx.GlobalBool(flags.UseSecureGrpcFlag.Name),
),
NumInstances: ctx.GlobalUint(flags.NumInstancesFlag.Name),
RequestInterval: ctx.Duration(flags.RequestIntervalFlag.Name),
NumWriteInstances: ctx.GlobalUint(flags.NumWriteInstancesFlag.Name),
WriteRequestInterval: ctx.Duration(flags.WriteRequestIntervalFlag.Name),
DataSize: ctx.GlobalUint64(flags.DataSizeFlag.Name),
LoggingConfig: *loggerConfig,
RandomizeBlobs: ctx.GlobalBool(flags.RandomizeBlobsFlag.Name),
Expand Down
20 changes: 10 additions & 10 deletions tools/traffic/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ var (
EnvVar: common.PrefixEnvVar(envPrefix, "TIMEOUT"),
Value: 10 * time.Second,
}
NumInstancesFlag = cli.UintFlag{
Name: common.PrefixFlag(FlagPrefix, "num-instances"),
Usage: "Number of generator instances to run in parallel",
NumWriteInstancesFlag = cli.UintFlag{
Name: common.PrefixFlag(FlagPrefix, "num-write-instances"),
Usage: "Number of generator instances producing write traffic to run in parallel",
Required: true,
EnvVar: common.PrefixEnvVar(envPrefix, "NUM_INSTANCES"),
EnvVar: common.PrefixEnvVar(envPrefix, "NUM_WRITE_INSTANCES"),
}
RequestIntervalFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "request-interval"),
Usage: "Duration between requests",
WriteRequestIntervalFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "write-request-interval"),
Usage: "Duration between write requests",
Required: true,
EnvVar: common.PrefixEnvVar(envPrefix, "REQUEST_INTERVAL"),
EnvVar: common.PrefixEnvVar(envPrefix, "WRITE_REQUEST_INTERVAL"),
Value: 30 * time.Second,
}
DataSizeFlag = cli.Uint64Flag{
Expand Down Expand Up @@ -89,8 +89,8 @@ var (
var requiredFlags = []cli.Flag{
HostnameFlag,
GrpcPortFlag,
NumInstancesFlag,
RequestIntervalFlag,
NumWriteInstancesFlag,
WriteRequestIntervalFlag,
DataSizeFlag,
}

Expand Down
47 changes: 35 additions & 12 deletions tools/traffic/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ func NewTrafficGenerator(config *Config, signer core.BlobRequestSigner) (*Traffi
}, nil
}

// Run instantiates goroutines that generate read/write traffic, continues until a SIGTERM is observed.
func (g *TrafficGenerator) Run() error {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
for i := 0; i < int(g.Config.NumInstances); i++ {
for i := 0; i < int(g.Config.NumWriteInstances); i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = g.StartTraffic(ctx)
_ = g.StartWriteWorker(ctx)
}()
time.Sleep(g.Config.InstanceLaunchInterval)
}
Expand All @@ -57,7 +58,29 @@ func (g *TrafficGenerator) Run() error {
return nil
}

func (g *TrafficGenerator) StartTraffic(ctx context.Context) error {
// TODO maybe split reader/writer into separate files

// StartReadWorker periodically requests to download random blobs at a configured rate.
func (g *TrafficGenerator) StartReadWorker(ctx context.Context) error {
ticker := time.NewTicker(g.Config.WriteRequestInterval)
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
// TODO determine which blob to download
g.readRequest() // TODO add parameters
}
}
}

// readRequest reads a blob.
func (g *TrafficGenerator) readRequest() {
// TODO
}

// StartWriteWorker periodically sends (possibly) random blobs to a disperser at a configured rate.
func (g *TrafficGenerator) StartWriteWorker(ctx context.Context) error {
data := make([]byte, g.Config.DataSize)
_, err := rand.Read(data)
if err != nil {
Expand All @@ -66,7 +89,7 @@ func (g *TrafficGenerator) StartTraffic(ctx context.Context) error {

paddedData := codec.ConvertByPaddingEmptyByte(data)

ticker := time.NewTicker(g.Config.RequestInterval)
ticker := time.NewTicker(g.Config.WriteRequestInterval)
for {
select {
case <-ctx.Done():
Expand All @@ -79,13 +102,13 @@ func (g *TrafficGenerator) StartTraffic(ctx context.Context) error {
}
paddedData = codec.ConvertByPaddingEmptyByte(data)

err = g.sendRequest(ctx, paddedData[:g.Config.DataSize])
_, err = g.sendRequest(ctx, paddedData[:g.Config.DataSize])
if err != nil {
g.Logger.Error("failed to send blob request", "err:", err)
}
paddedData = nil
} else {
err = g.sendRequest(ctx, paddedData[:g.Config.DataSize])
_, err = g.sendRequest(ctx, paddedData[:g.Config.DataSize])
if err != nil {
g.Logger.Error("failed to send blob request", "err:", err)
}
Expand All @@ -95,26 +118,26 @@ func (g *TrafficGenerator) StartTraffic(ctx context.Context) error {
}
}

func (g *TrafficGenerator) sendRequest(ctx context.Context, data []byte) error {
// sendRequest sends a blob to a disperser.
func (g *TrafficGenerator) sendRequest(ctx context.Context, data []byte) ([]byte /* key */, error) {
ctxTimeout, cancel := context.WithTimeout(ctx, g.Config.Timeout)
defer cancel()

if g.Config.SignerPrivateKey != "" {
blobStatus, key, err := g.DisperserClient.DisperseBlobAuthenticated(ctxTimeout, data, g.Config.CustomQuorums)
if err != nil {
return err
return nil, err
}

g.Logger.Info("successfully dispersed new blob", "authenticated", true, "key", hex.EncodeToString(key), "status", blobStatus.String())
return nil
return key, nil
} else {
blobStatus, key, err := g.DisperserClient.DisperseBlob(ctxTimeout, data, g.Config.CustomQuorums)
if err != nil {
return err
return nil, err
}

g.Logger.Info("successfully dispersed new blob", "authenticated", false, "key", hex.EncodeToString(key), "status", blobStatus.String())
return nil
return key, nil
}

}
14 changes: 7 additions & 7 deletions tools/traffic/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func TestTrafficGenerator(t *testing.T) {
Config: clients.Config{
Timeout: 1 * time.Second,
},
DataSize: 1000_000,
RequestInterval: 2 * time.Second,
DataSize: 1000_000,
WriteRequestInterval: 2 * time.Second,
},
DisperserClient: disperserClient,
}
Expand All @@ -34,7 +34,7 @@ func TestTrafficGenerator(t *testing.T) {
Return(&processing, []byte{1}, nil)
ctx, cancel := context.WithCancel(context.Background())
go func() {
_ = trafficGenerator.StartTraffic(ctx)
_ = trafficGenerator.StartWriteWorker(ctx)
}()
time.Sleep(5 * time.Second)
cancel()
Expand All @@ -51,9 +51,9 @@ func TestTrafficGeneratorAuthenticated(t *testing.T) {
Config: clients.Config{
Timeout: 1 * time.Second,
},
DataSize: 1000_000,
RequestInterval: 2 * time.Second,
SignerPrivateKey: "Hi",
DataSize: 1000_000,
WriteRequestInterval: 2 * time.Second,
SignerPrivateKey: "Hi",
},
DisperserClient: disperserClient,
}
Expand All @@ -63,7 +63,7 @@ func TestTrafficGeneratorAuthenticated(t *testing.T) {
Return(&processing, []byte{1}, nil)
ctx, cancel := context.WithCancel(context.Background())
go func() {
_ = trafficGenerator.StartTraffic(ctx)
_ = trafficGenerator.StartWriteWorker(ctx)
}()
time.Sleep(5 * time.Second)
cancel()
Expand Down

0 comments on commit 8191b7f

Please sign in to comment.