From afa2e43b3e8cc18c222a4aefa2ca59eb1d72270f Mon Sep 17 00:00:00 2001 From: Roberto D'Auria Date: Tue, 6 Feb 2024 12:19:38 +0100 Subject: [PATCH] Add summary at the end of the measurement --- cmd/msak-client/client.go | 2 ++ pkg/client/client.go | 54 +++++++++++++++++++++++++++++---------- pkg/client/emitter.go | 19 +++++++++++--- 3 files changed, 57 insertions(+), 18 deletions(-) diff --git a/cmd/msak-client/client.go b/cmd/msak-client/client.go index ebc0557..70cdbed 100644 --- a/cmd/msak-client/client.go +++ b/cmd/msak-client/client.go @@ -65,4 +65,6 @@ func main() { if *flagUpload { cl.Upload(context.Background()) } + + cl.PrintSummary() } diff --git a/pkg/client/client.go b/pkg/client/client.go index 87a351e..f15e525 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -30,7 +30,7 @@ const ( DefaultWebSocketHandshakeTimeout = 5 * time.Second // DefaultStreams is the default number of streams for a new client. - DefaultStreams = 3 + DefaultStreams = 2 // DefaultLength is the default test duration for a new client. DefaultLength = 5 * time.Second @@ -98,6 +98,11 @@ type Throughput1Client struct { // minRTT is the lowest RTT value observed across all streams. minRTT atomic.Uint32 + + // lastResultForSubtest contains the last recorded measurement for the + // corresponding subtest (download/upload). + lastResultForSubtest map[spec.SubtestKind]Result + lastResultForSubtestMutex sync.Mutex } // Result contains the aggregate metrics collected during the test. @@ -114,6 +119,14 @@ type Result struct { RTT uint32 // MinRTT is the minimum of RTT values observed across all the streams. MinRTT uint32 + // Streams is the number of streams used in the test. + Streams int + // ByteLimit is the byte limit used in the test. + ByteLimit int + // Length is the length of the test. + Length time.Duration + // CongestionControl is the congestion control used in the test. + CongestionControl string } // makeUserAgent creates the user agent string. @@ -139,6 +152,8 @@ func New(clientName, clientVersion string, config Config) *Throughput1Client { tIndex: map[string]int{}, recvByteCounters: map[int][]int64{}, + + lastResultForSubtest: map[spec.SubtestKind]Result{}, } } @@ -262,7 +277,6 @@ func (c *Throughput1Client) start(ctx context.Context, subtest spec.SubtestKind) } wg.Wait() - return nil } @@ -317,7 +331,7 @@ func (c *Throughput1Client) runStream(ctx context.Context, streamID int, mURL *u for { select { case <-ctx.Done(): - c.config.Emitter.OnComplete(streamID, mURL.Host) + c.config.Emitter.OnStreamComplete(streamID, mURL.Host) return nil case m = <-clientCh: // If subtest is download, store the client-side measurement. @@ -339,7 +353,11 @@ func (c *Throughput1Client) runStream(ctx context.Context, streamID int, mURL *u m.Network.BytesReceived, m.Network.BytesSent)) c.storeMeasurement(streamID, m) if c.started.Load() { - c.emitResult(c.sharedStartTime) + res := c.computeResult() + c.config.Emitter.OnResult(res) + c.lastResultForSubtestMutex.Lock() + c.lastResultForSubtest[subtest] = res + c.lastResultForSubtestMutex.Unlock() } } } @@ -374,19 +392,22 @@ func (c *Throughput1Client) applicationBytes() int64 { return sum } -// emitResult emits the result of the current measurement via the configured Emitter. -func (c *Throughput1Client) emitResult(start time.Time) { +// computeResult returns a Result struct with the current state of the measurement. +func (c *Throughput1Client) computeResult() Result { applicationBytes := c.applicationBytes() - elapsed := time.Since(start) + elapsed := time.Since(c.sharedStartTime) goodput := float64(applicationBytes) / float64(elapsed.Seconds()) * 8 // bps - result := Result{ - Elapsed: elapsed, - Goodput: goodput, - Throughput: 0, // TODO, - MinRTT: c.minRTT.Load(), - RTT: c.rtt.Load(), + return Result{ + Elapsed: elapsed, + Goodput: goodput, + Throughput: 0, // TODO, + MinRTT: c.minRTT.Load(), + RTT: c.rtt.Load(), + Streams: c.config.NumStreams, + ByteLimit: c.config.ByteLimit, + Length: c.config.Length, + CongestionControl: c.config.CongestionControl, } - c.config.Emitter.OnResult(result) } // Download runs a download test using the settings configured for this client. @@ -405,6 +426,11 @@ func (c *Throughput1Client) Upload(ctx context.Context) { } } +// PrintSummary emits a summary via the configured emitter +func (c *Throughput1Client) PrintSummary() { + c.config.Emitter.OnSummary(c.lastResultForSubtest) +} + func getPathForSubtest(subtest spec.SubtestKind) string { switch subtest { case spec.SubtestDownload: diff --git a/pkg/client/emitter.go b/pkg/client/emitter.go index 577429e..694abf5 100644 --- a/pkg/client/emitter.go +++ b/pkg/client/emitter.go @@ -20,10 +20,12 @@ type Emitter interface { OnResult(Result) // OnError is called on errors. OnError(err error) - // OnComplete is called after a stream completes. - OnComplete(streamID int, server string) + // OnStreamComplete is called after a stream completes. + OnStreamComplete(streamID int, server string) // OnDebug is called to print debug information. OnDebug(msg string) + // OnSummary is called to print summary information. + OnSummary(results map[spec.SubtestKind]Result) } // HumanReadable prints human-readable output to stdout. @@ -60,11 +62,20 @@ func (HumanReadable) OnError(err error) { } } -// OnComplete is called after a stream completes. -func (HumanReadable) OnComplete(streamID int, server string) { +// OnStreamComplete is called after a stream completes. +func (HumanReadable) OnStreamComplete(streamID int, server string) { fmt.Printf("Stream %d complete (server %s)\n", streamID, server) } +func (HumanReadable) OnSummary(results map[spec.SubtestKind]Result) { + fmt.Println() + fmt.Printf("Test results:\n") + for kind, result := range results { + fmt.Printf(" %s rate: %.2f Mb/s, rtt: %.2f ms, minrtt: %.2f ms\n", + kind, result.Goodput/1e6, float32(result.RTT)/1000, float32(result.MinRTT)/1000) + } +} + // OnDebug is called to print debug information. func (e HumanReadable) OnDebug(msg string) { if e.Debug {