diff --git a/tools/traffic/Makefile b/tools/traffic/Makefile index c9cea11d4..3fa0618d8 100644 --- a/tools/traffic/Makefile +++ b/tools/traffic/Makefile @@ -8,10 +8,12 @@ build: clean run: build TRAFFIC_GENERATOR_HOSTNAME=localhost \ - TRAFFIC_GENERATOR_GRPC_PORT=32001 \ + TRAFFIC_GENERATOR_GRPC_PORT=32003 \ TRAFFIC_GENERATOR_TIMEOUT=10s \ - TRAFFIC_GENERATOR_NUM_INSTANCES=1 \ - TRAFFIC_GENERATOR_REQUEST_INTERVAL=1s \ + TRAFFIC_GENERATOR_NUM_WRITE_INSTANCES=1 \ + TRAFFIC_GENERATOR_WRITE_REQUEST_INTERVAL=1s \ TRAFFIC_GENERATOR_DATA_SIZE=1000 \ TRAFFIC_GENERATOR_RANDOMIZE_BLOBS=true \ ./bin/server + +# TODO do not merge change TRAFFIC_GENERATOR_GRPC_PORT=32003 \ No newline at end of file diff --git a/tools/traffic/config.go b/tools/traffic/config.go index ca8412570..215951a10 100644 --- a/tools/traffic/config.go +++ b/tools/traffic/config.go @@ -14,17 +14,30 @@ import ( type Config struct { clients.Config + // TODO add to flags.go + // The number of worker threads that generate read traffic. + NumReadInstances uint + // The period of the submission rate of read requests for each read worker thread. + ReadRequestInterval time.Duration + // For each blob, how many times should it be downloaded? If between 0.0 and 1.0, blob will be downloaded + // 0 or 1 times with the specified probability (e.g. 0.2 means each blob has a 20% chance of being downloaded). + // If greater than 1.0, then each blob will be downloaded the specified number of times. + DownloadRate float64 + // The minimum amount of time that must pass after a blob is written prior to the first read attempt being made. + ReadDelay time.Duration + // The number of worker threads that generate write traffic. - NumInstances uint - // The period of the submission rate of new blobs for each worker thread. - RequestInterval time.Duration + NumWriteInstances uint + // The period of the submission rate of new blobs for each write worker thread. + WriteRequestInterval time.Duration // The size of each blob dispersed, in bytes. DataSize uint64 - // Configures logging for the traffic generator. - LoggingConfig common.LoggerConfig // If true, then each blob will contain unique random data. If false, the same random data // will be dispersed for each blob by a particular worker thread. RandomizeBlobs bool + + // Configures logging for the traffic generator. + LoggingConfig common.LoggerConfig // The amount of time to sleep after launching each worker thread. InstanceLaunchInterval time.Duration @@ -52,8 +65,8 @@ func NewConfig(ctx *cli.Context) (*Config, error) { ctx.Duration(flags.TimeoutFlag.Name), ctx.GlobalBool(flags.UseSecureGrpcFlag.Name), ), - NumInstances: ctx.GlobalUint(flags.NumInstancesFlag.Name), - RequestInterval: ctx.Duration(flags.RequestIntervalFlag.Name), + NumWriteInstances: ctx.GlobalUint(flags.NumWriteInstancesFlag.Name), + WriteRequestInterval: ctx.Duration(flags.WriteRequestIntervalFlag.Name), DataSize: ctx.GlobalUint64(flags.DataSizeFlag.Name), LoggingConfig: *loggerConfig, RandomizeBlobs: ctx.GlobalBool(flags.RandomizeBlobsFlag.Name), diff --git a/tools/traffic/flags/flags.go b/tools/traffic/flags/flags.go index aca2eb11a..57ee87cba 100644 --- a/tools/traffic/flags/flags.go +++ b/tools/traffic/flags/flags.go @@ -34,17 +34,17 @@ var ( EnvVar: common.PrefixEnvVar(envPrefix, "TIMEOUT"), Value: 10 * time.Second, } - NumInstancesFlag = cli.UintFlag{ - Name: common.PrefixFlag(FlagPrefix, "num-instances"), - Usage: "Number of generator instances to run in parallel", + NumWriteInstancesFlag = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "num-write-instances"), + Usage: "Number of generator instances producing write traffic to run in parallel", Required: true, - EnvVar: common.PrefixEnvVar(envPrefix, "NUM_INSTANCES"), + EnvVar: common.PrefixEnvVar(envPrefix, "NUM_WRITE_INSTANCES"), } - RequestIntervalFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "request-interval"), - Usage: "Duration between requests", + WriteRequestIntervalFlag = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "write-request-interval"), + Usage: "Duration between write requests", Required: true, - EnvVar: common.PrefixEnvVar(envPrefix, "REQUEST_INTERVAL"), + EnvVar: common.PrefixEnvVar(envPrefix, "WRITE_REQUEST_INTERVAL"), Value: 30 * time.Second, } DataSizeFlag = cli.Uint64Flag{ @@ -89,8 +89,8 @@ var ( var requiredFlags = []cli.Flag{ HostnameFlag, GrpcPortFlag, - NumInstancesFlag, - RequestIntervalFlag, + NumWriteInstancesFlag, + WriteRequestIntervalFlag, DataSizeFlag, } diff --git a/tools/traffic/generator.go b/tools/traffic/generator.go index 2f8731ffd..67a621cc1 100644 --- a/tools/traffic/generator.go +++ b/tools/traffic/generator.go @@ -37,14 +37,15 @@ func NewTrafficGenerator(config *Config, signer core.BlobRequestSigner) (*Traffi }, nil } +// Run instantiates goroutines that generate read/write traffic, continues until a SIGTERM is observed. func (g *TrafficGenerator) Run() error { ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup - for i := 0; i < int(g.Config.NumInstances); i++ { + for i := 0; i < int(g.Config.NumWriteInstances); i++ { wg.Add(1) go func() { defer wg.Done() - _ = g.StartTraffic(ctx) + _ = g.StartWriteWorker(ctx) }() time.Sleep(g.Config.InstanceLaunchInterval) } @@ -57,7 +58,29 @@ func (g *TrafficGenerator) Run() error { return nil } -func (g *TrafficGenerator) StartTraffic(ctx context.Context) error { +// TODO maybe split reader/writer into separate files + +// StartReadWorker periodically requests to download random blobs at a configured rate. +func (g *TrafficGenerator) StartReadWorker(ctx context.Context) error { + ticker := time.NewTicker(g.Config.WriteRequestInterval) + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + // TODO determine which blob to download + g.readRequest() // TODO add parameters + } + } +} + +// readRequest reads a blob. +func (g *TrafficGenerator) readRequest() { + // TODO +} + +// StartWriteWorker periodically sends (possibly) random blobs to a disperser at a configured rate. +func (g *TrafficGenerator) StartWriteWorker(ctx context.Context) error { data := make([]byte, g.Config.DataSize) _, err := rand.Read(data) if err != nil { @@ -66,7 +89,7 @@ func (g *TrafficGenerator) StartTraffic(ctx context.Context) error { paddedData := codec.ConvertByPaddingEmptyByte(data) - ticker := time.NewTicker(g.Config.RequestInterval) + ticker := time.NewTicker(g.Config.WriteRequestInterval) for { select { case <-ctx.Done(): @@ -79,13 +102,13 @@ func (g *TrafficGenerator) StartTraffic(ctx context.Context) error { } paddedData = codec.ConvertByPaddingEmptyByte(data) - err = g.sendRequest(ctx, paddedData[:g.Config.DataSize]) + _, err = g.sendRequest(ctx, paddedData[:g.Config.DataSize]) if err != nil { g.Logger.Error("failed to send blob request", "err:", err) } paddedData = nil } else { - err = g.sendRequest(ctx, paddedData[:g.Config.DataSize]) + _, err = g.sendRequest(ctx, paddedData[:g.Config.DataSize]) if err != nil { g.Logger.Error("failed to send blob request", "err:", err) } @@ -95,26 +118,26 @@ func (g *TrafficGenerator) StartTraffic(ctx context.Context) error { } } -func (g *TrafficGenerator) sendRequest(ctx context.Context, data []byte) error { +// sendRequest sends a blob to a disperser. +func (g *TrafficGenerator) sendRequest(ctx context.Context, data []byte) ([]byte /* key */, error) { ctxTimeout, cancel := context.WithTimeout(ctx, g.Config.Timeout) defer cancel() if g.Config.SignerPrivateKey != "" { blobStatus, key, err := g.DisperserClient.DisperseBlobAuthenticated(ctxTimeout, data, g.Config.CustomQuorums) if err != nil { - return err + return nil, err } g.Logger.Info("successfully dispersed new blob", "authenticated", true, "key", hex.EncodeToString(key), "status", blobStatus.String()) - return nil + return key, nil } else { blobStatus, key, err := g.DisperserClient.DisperseBlob(ctxTimeout, data, g.Config.CustomQuorums) if err != nil { - return err + return nil, err } g.Logger.Info("successfully dispersed new blob", "authenticated", false, "key", hex.EncodeToString(key), "status", blobStatus.String()) - return nil + return key, nil } - } diff --git a/tools/traffic/generator_test.go b/tools/traffic/generator_test.go index b530ef5bd..b2ef6abb1 100644 --- a/tools/traffic/generator_test.go +++ b/tools/traffic/generator_test.go @@ -23,8 +23,8 @@ func TestTrafficGenerator(t *testing.T) { Config: clients.Config{ Timeout: 1 * time.Second, }, - DataSize: 1000_000, - RequestInterval: 2 * time.Second, + DataSize: 1000_000, + WriteRequestInterval: 2 * time.Second, }, DisperserClient: disperserClient, } @@ -34,7 +34,7 @@ func TestTrafficGenerator(t *testing.T) { Return(&processing, []byte{1}, nil) ctx, cancel := context.WithCancel(context.Background()) go func() { - _ = trafficGenerator.StartTraffic(ctx) + _ = trafficGenerator.StartWriteWorker(ctx) }() time.Sleep(5 * time.Second) cancel() @@ -51,9 +51,9 @@ func TestTrafficGeneratorAuthenticated(t *testing.T) { Config: clients.Config{ Timeout: 1 * time.Second, }, - DataSize: 1000_000, - RequestInterval: 2 * time.Second, - SignerPrivateKey: "Hi", + DataSize: 1000_000, + WriteRequestInterval: 2 * time.Second, + SignerPrivateKey: "Hi", }, DisperserClient: disperserClient, } @@ -63,7 +63,7 @@ func TestTrafficGeneratorAuthenticated(t *testing.T) { Return(&processing, []byte{1}, nil) ctx, cancel := context.WithCancel(context.Background()) go func() { - _ = trafficGenerator.StartTraffic(ctx) + _ = trafficGenerator.StartWriteWorker(ctx) }() time.Sleep(5 * time.Second) cancel()