diff --git a/pkg/client/client.go b/pkg/client/client.go index 54e59f1..02d90b0 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -230,54 +230,18 @@ func (c *Throughput1Client) start(ctx context.Context, subtest spec.SubtestKind) globalStartTime := time.Now() applicationBytes := map[int][]int64{} - // Main client loop. Spawns one goroutine per stream requested. + // Main client loop. Spawns one goroutine per stream. for i := 0; i < c.NumStreams; i++ { streamID := i wg.Add(1) - measurements := make(chan model.WireMeasurement) go func() { defer wg.Done() - // Connect to mURL. - c.Emitter.OnStart(mURL.Host, subtest) - conn, err := c.connect(globalTimeout, mURL) + // Run a single stream. + err := c.runStream(globalTimeout, streamID, mURL, subtest, globalStartTime, applicationBytes) if err != nil { c.Emitter.OnError(err) - close(measurements) - return - } - c.Emitter.OnConnect(mURL.String()) - - proto := throughput1.New(conn) - - var clientCh, serverCh <-chan model.WireMeasurement - var errCh <-chan error - switch subtest { - case spec.SubtestDownload: - clientCh, serverCh, errCh = proto.ReceiverLoop(globalTimeout) - case spec.SubtestUpload: - clientCh, serverCh, errCh = proto.SenderLoop(globalTimeout) - } - - for { - select { - case <-globalTimeout.Done(): - c.Emitter.OnComplete(streamID, mURL.Host) - return - case m := <-clientCh: - if subtest != spec.SubtestDownload { - continue - } - c.emitResults(streamID, m, globalStartTime, applicationBytes) - case m := <-serverCh: - if subtest != spec.SubtestUpload { - continue - } - c.emitResults(streamID, m, globalStartTime, applicationBytes) - case err := <-errCh: - c.Emitter.OnError(err) - } } }() @@ -289,6 +253,52 @@ func (c *Throughput1Client) start(ctx context.Context, subtest spec.SubtestKind) return nil } +func (c *Throughput1Client) runStream(ctx context.Context, streamID int, mURL *url.URL, + subtest spec.SubtestKind, globalStartTime time.Time, applicationBytes map[int][]int64) error { + + measurements := make(chan model.WireMeasurement) + + c.Emitter.OnStart(mURL.Host, subtest) + conn, err := c.connect(ctx, mURL) + if err != nil { + c.Emitter.OnError(err) + close(measurements) + return err + } + c.Emitter.OnConnect(mURL.String()) + + proto := throughput1.New(conn) + + var clientCh, serverCh <-chan model.WireMeasurement + var errCh <-chan error + switch subtest { + case spec.SubtestDownload: + clientCh, serverCh, errCh = proto.ReceiverLoop(ctx) + case spec.SubtestUpload: + clientCh, serverCh, errCh = proto.SenderLoop(ctx) + } + + for { + select { + case <-ctx.Done(): + c.Emitter.OnComplete(streamID, mURL.Host) + return nil + case m := <-clientCh: + if subtest != spec.SubtestDownload { + continue + } + c.emitResults(streamID, m, globalStartTime, applicationBytes) + case m := <-serverCh: + if subtest != spec.SubtestUpload { + continue + } + c.emitResults(streamID, m, globalStartTime, applicationBytes) + case err := <-errCh: + return err + } + } +} + func (c *Throughput1Client) emitResults(streamID int, m model.WireMeasurement, globalStartTime time.Time, applicationBytes map[int][]int64) { c.Emitter.OnMeasurement(streamID, m)