Skip to content

Commit

Permalink
Refactor code to extract runStream().
Browse files Browse the repository at this point in the history
  • Loading branch information
robertodauria committed Sep 27, 2023
1 parent 5e8b90e commit 706f888
Showing 1 changed file with 49 additions and 39 deletions.
88 changes: 49 additions & 39 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,54 +230,18 @@ func (c *Throughput1Client) start(ctx context.Context, subtest spec.SubtestKind)
globalStartTime := time.Now()
applicationBytes := map[int][]int64{}

// Main client loop. Spawns one goroutine per stream requested.
// Main client loop. Spawns one goroutine per stream.
for i := 0; i < c.NumStreams; i++ {
streamID := i
wg.Add(1)
measurements := make(chan model.WireMeasurement)

go func() {
defer wg.Done()

// Connect to mURL.
c.Emitter.OnStart(mURL.Host, subtest)
conn, err := c.connect(globalTimeout, mURL)
// Run a single stream.
err := c.runStream(globalTimeout, streamID, mURL, subtest, globalStartTime, applicationBytes)
if err != nil {
c.Emitter.OnError(err)
close(measurements)
return
}
c.Emitter.OnConnect(mURL.String())

proto := throughput1.New(conn)

var clientCh, serverCh <-chan model.WireMeasurement
var errCh <-chan error
switch subtest {
case spec.SubtestDownload:
clientCh, serverCh, errCh = proto.ReceiverLoop(globalTimeout)
case spec.SubtestUpload:
clientCh, serverCh, errCh = proto.SenderLoop(globalTimeout)
}

for {
select {
case <-globalTimeout.Done():
c.Emitter.OnComplete(streamID, mURL.Host)
return
case m := <-clientCh:
if subtest != spec.SubtestDownload {
continue
}
c.emitResults(streamID, m, globalStartTime, applicationBytes)
case m := <-serverCh:
if subtest != spec.SubtestUpload {
continue
}
c.emitResults(streamID, m, globalStartTime, applicationBytes)
case err := <-errCh:
c.Emitter.OnError(err)
}
}
}()

Expand All @@ -289,6 +253,52 @@ func (c *Throughput1Client) start(ctx context.Context, subtest spec.SubtestKind)
return nil
}

func (c *Throughput1Client) runStream(ctx context.Context, streamID int, mURL *url.URL,
subtest spec.SubtestKind, globalStartTime time.Time, applicationBytes map[int][]int64) error {

measurements := make(chan model.WireMeasurement)

c.Emitter.OnStart(mURL.Host, subtest)
conn, err := c.connect(ctx, mURL)
if err != nil {
c.Emitter.OnError(err)
close(measurements)
return err
}
c.Emitter.OnConnect(mURL.String())

proto := throughput1.New(conn)

var clientCh, serverCh <-chan model.WireMeasurement
var errCh <-chan error
switch subtest {
case spec.SubtestDownload:
clientCh, serverCh, errCh = proto.ReceiverLoop(ctx)
case spec.SubtestUpload:
clientCh, serverCh, errCh = proto.SenderLoop(ctx)
}

for {
select {
case <-ctx.Done():
c.Emitter.OnComplete(streamID, mURL.Host)
return nil
case m := <-clientCh:
if subtest != spec.SubtestDownload {
continue
}
c.emitResults(streamID, m, globalStartTime, applicationBytes)
case m := <-serverCh:
if subtest != spec.SubtestUpload {
continue
}
c.emitResults(streamID, m, globalStartTime, applicationBytes)
case err := <-errCh:
return err
}
}
}

func (c *Throughput1Client) emitResults(streamID int, m model.WireMeasurement,
globalStartTime time.Time, applicationBytes map[int][]int64) {
c.Emitter.OnMeasurement(streamID, m)
Expand Down

0 comments on commit 706f888

Please sign in to comment.