Skip to content

Commit

Permalink
Add summary at the end of the measurement
Browse files Browse the repository at this point in the history
  • Loading branch information
robertodauria committed Feb 6, 2024
1 parent 55a10b8 commit afa2e43
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 18 deletions.
2 changes: 2 additions & 0 deletions cmd/msak-client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,6 @@ func main() {
if *flagUpload {
cl.Upload(context.Background())
}

cl.PrintSummary()
}
54 changes: 40 additions & 14 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
DefaultWebSocketHandshakeTimeout = 5 * time.Second

// DefaultStreams is the default number of streams for a new client.
DefaultStreams = 3
DefaultStreams = 2

// DefaultLength is the default test duration for a new client.
DefaultLength = 5 * time.Second
Expand Down Expand Up @@ -98,6 +98,11 @@ type Throughput1Client struct {

// minRTT is the lowest RTT value observed across all streams.
minRTT atomic.Uint32

// lastResultForSubtest contains the last recorded measurement for the
// corresponding subtest (download/upload).
lastResultForSubtest map[spec.SubtestKind]Result
lastResultForSubtestMutex sync.Mutex
}

// Result contains the aggregate metrics collected during the test.
Expand All @@ -114,6 +119,14 @@ type Result struct {
RTT uint32
// MinRTT is the minimum of RTT values observed across all the streams.
MinRTT uint32
// Streams is the number of streams used in the test.
Streams int
// ByteLimit is the byte limit used in the test.
ByteLimit int
// Length is the length of the test.
Length time.Duration
// CongestionControl is the congestion control used in the test.
CongestionControl string
}

// makeUserAgent creates the user agent string.
Expand All @@ -139,6 +152,8 @@ func New(clientName, clientVersion string, config Config) *Throughput1Client {

tIndex: map[string]int{},
recvByteCounters: map[int][]int64{},

lastResultForSubtest: map[spec.SubtestKind]Result{},
}
}

Expand Down Expand Up @@ -262,7 +277,6 @@ func (c *Throughput1Client) start(ctx context.Context, subtest spec.SubtestKind)
}

wg.Wait()

return nil
}

Expand Down Expand Up @@ -317,7 +331,7 @@ func (c *Throughput1Client) runStream(ctx context.Context, streamID int, mURL *u
for {
select {
case <-ctx.Done():
c.config.Emitter.OnComplete(streamID, mURL.Host)
c.config.Emitter.OnStreamComplete(streamID, mURL.Host)
return nil
case m = <-clientCh:
// If subtest is download, store the client-side measurement.
Expand All @@ -339,7 +353,11 @@ func (c *Throughput1Client) runStream(ctx context.Context, streamID int, mURL *u
m.Network.BytesReceived, m.Network.BytesSent))
c.storeMeasurement(streamID, m)
if c.started.Load() {
c.emitResult(c.sharedStartTime)
res := c.computeResult()
c.config.Emitter.OnResult(res)
c.lastResultForSubtestMutex.Lock()
c.lastResultForSubtest[subtest] = res
c.lastResultForSubtestMutex.Unlock()
}
}
}
Expand Down Expand Up @@ -374,19 +392,22 @@ func (c *Throughput1Client) applicationBytes() int64 {
return sum
}

// emitResult emits the result of the current measurement via the configured Emitter.
func (c *Throughput1Client) emitResult(start time.Time) {
// computeResult returns a Result struct with the current state of the measurement.
func (c *Throughput1Client) computeResult() Result {
applicationBytes := c.applicationBytes()
elapsed := time.Since(start)
elapsed := time.Since(c.sharedStartTime)
goodput := float64(applicationBytes) / float64(elapsed.Seconds()) * 8 // bps
result := Result{
Elapsed: elapsed,
Goodput: goodput,
Throughput: 0, // TODO,
MinRTT: c.minRTT.Load(),
RTT: c.rtt.Load(),
return Result{
Elapsed: elapsed,
Goodput: goodput,
Throughput: 0, // TODO,
MinRTT: c.minRTT.Load(),
RTT: c.rtt.Load(),
Streams: c.config.NumStreams,
ByteLimit: c.config.ByteLimit,
Length: c.config.Length,
CongestionControl: c.config.CongestionControl,
}
c.config.Emitter.OnResult(result)
}

// Download runs a download test using the settings configured for this client.
Expand All @@ -405,6 +426,11 @@ func (c *Throughput1Client) Upload(ctx context.Context) {
}
}

// PrintSummary emits a summary via the configured emitter
func (c *Throughput1Client) PrintSummary() {
c.config.Emitter.OnSummary(c.lastResultForSubtest)
}

func getPathForSubtest(subtest spec.SubtestKind) string {
switch subtest {
case spec.SubtestDownload:
Expand Down
19 changes: 15 additions & 4 deletions pkg/client/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ type Emitter interface {
OnResult(Result)
// OnError is called on errors.
OnError(err error)
// OnComplete is called after a stream completes.
OnComplete(streamID int, server string)
// OnStreamComplete is called after a stream completes.
OnStreamComplete(streamID int, server string)
// OnDebug is called to print debug information.
OnDebug(msg string)
// OnSummary is called to print summary information.
OnSummary(results map[spec.SubtestKind]Result)
}

// HumanReadable prints human-readable output to stdout.
Expand Down Expand Up @@ -60,11 +62,20 @@ func (HumanReadable) OnError(err error) {
}
}

// OnComplete is called after a stream completes.
func (HumanReadable) OnComplete(streamID int, server string) {
// OnStreamComplete is called after a stream completes.
func (HumanReadable) OnStreamComplete(streamID int, server string) {
fmt.Printf("Stream %d complete (server %s)\n", streamID, server)
}

func (HumanReadable) OnSummary(results map[spec.SubtestKind]Result) {
fmt.Println()
fmt.Printf("Test results:\n")
for kind, result := range results {
fmt.Printf(" %s rate: %.2f Mb/s, rtt: %.2f ms, minrtt: %.2f ms\n",
kind, result.Goodput/1e6, float32(result.RTT)/1000, float32(result.MinRTT)/1000)
}
}

// OnDebug is called to print debug information.
func (e HumanReadable) OnDebug(msg string) {
if e.Debug {
Expand Down

0 comments on commit afa2e43

Please sign in to comment.