Skip to content

Commit

Permalink
Fix goodput calculation in msak-client (#36)
Browse files Browse the repository at this point in the history
* Fix goodput calculation

* Emit output when a measurement comes in
  • Loading branch information
robertodauria authored Jan 11, 2024
1 parent ac3c733 commit e8e5455
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 19 deletions.
29 changes: 11 additions & 18 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/url"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -90,6 +91,7 @@ type Throughput1Client struct {
// sharedStartTime is the time at which the test started, shared across all streams.
// It is set when the first streams connects to the server and used to compute the elapsed time.
sharedStartTime time.Time
started atomic.Bool
}

// Result contains the aggregate metrics collected during the test.
Expand Down Expand Up @@ -221,16 +223,15 @@ func (c *Throughput1Client) start(ctx context.Context, subtest spec.SubtestKind)
go func() {
// Wait for the start signal to come from any of the streams.
// Returns early if the context is cancelled.
started := c.waitStart(testCtx, startTimeCh)
if !started {

c.started.Store(c.waitStart(testCtx, startTimeCh))
if !c.started.Load() {
return
}

// Once at least one of the streams has started, start a timer to cancel
// the context after the configured test duration.
time.AfterFunc(c.config.Length, cancelTest)

c.emitLoop(testCtx)
}()

// Main client loop. Spawns one goroutine per stream.
Expand Down Expand Up @@ -267,20 +268,6 @@ func (c *Throughput1Client) waitStart(ctx context.Context, startTimeCh chan time
return true
}

// emitLoop emits the results every 100ms once a . It stops when the context is cancelled.
func (c *Throughput1Client) emitLoop(ctx context.Context) {
t := time.NewTicker(100 * time.Millisecond)

for {
select {
case <-ctx.Done():
return
case <-t.C:
c.emitResult(c.sharedStartTime)
}
}
}

func (c *Throughput1Client) runStream(ctx context.Context, streamID int, mURL *url.URL,
subtest spec.SubtestKind, startTimeCh chan time.Time) error {

Expand Down Expand Up @@ -331,6 +318,9 @@ func (c *Throughput1Client) runStream(ctx context.Context, streamID int, mURL *u
streamID, m.Application.BytesReceived, m.Application.BytesSent,
m.Network.BytesReceived, m.Network.BytesSent))
c.storeMeasurement(streamID, m)
if c.started.Load() {
c.emitResult(c.sharedStartTime)
}
case m := <-serverCh:
// If subtest is upload, store the server-side measurement.
if subtest != spec.SubtestUpload {
Expand All @@ -341,6 +331,9 @@ func (c *Throughput1Client) runStream(ctx context.Context, streamID int, mURL *u
streamID, m.Application.BytesReceived, m.Application.BytesSent,
m.Network.BytesReceived, m.Network.BytesSent))
c.storeMeasurement(streamID, m)
if c.started.Load() {
c.emitResult(c.sharedStartTime)
}
case err := <-errCh:
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type HumanReadable struct {
// OnResult prints the aggregate result.
func (HumanReadable) OnResult(r Result) {
fmt.Printf("Elapsed: %.2fs, Goodput: %f Mb/s, MinRTT: %d\n", r.Elapsed.Seconds(),
r.Goodput/1024/1024, r.MinRTT)
r.Goodput/1e6, r.MinRTT)
}

// OnStart is called when the stream starts and prints the subtest and server hostname.
Expand Down

0 comments on commit e8e5455

Please sign in to comment.