Skip to content

Commit

Permalink
Refactor code, only output every 100ms
Browse files Browse the repository at this point in the history
  • Loading branch information
robertodauria committed Sep 28, 2023
1 parent 61c3820 commit b9d430c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 32 deletions.
68 changes: 45 additions & 23 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,23 @@ func (c *Throughput1Client) start(ctx context.Context, subtest spec.SubtestKind)
globalTimeout, cancel := context.WithTimeout(ctx, c.Length)
defer cancel()

// Reset the counters.
c.recvByteCounters = map[int][]int64{}
globalStartTime := time.Now()

go func() {
t := time.NewTicker(100 * time.Millisecond)
// Print goodput every 100ms. Stop when the context is cancelled.
for {
select {
case <-globalTimeout.Done():
return
case <-t.C:
c.emitResult(globalStartTime)
}
}
}()

// Main client loop. Spawns one goroutine per stream.
for i := 0; i < c.NumStreams; i++ {
streamID := i
Expand Down Expand Up @@ -288,51 +303,58 @@ func (c *Throughput1Client) runStream(ctx context.Context, streamID int, mURL *u
c.Emitter.OnComplete(streamID, mURL.Host)
return nil
case m := <-clientCh:
// If subtest is download, store the client-side measurement.
if subtest != spec.SubtestDownload {
continue
}
c.emitResults(streamID, m, globalStartTime)
c.Emitter.OnMeasurement(streamID, m)
c.Emitter.OnDebug(fmt.Sprintf("Stream #%d - application r/w: %d/%d, network r/w: %d/%d\n",
streamID, m.Application.BytesReceived, m.Application.BytesSent,
m.Network.BytesReceived, m.Network.BytesSent))
c.storeMeasurement(streamID, m)
case m := <-serverCh:
// If subtest is upload, store the server-side measurement.
if subtest != spec.SubtestUpload {
continue
}
c.emitResults(streamID, m, globalStartTime)
c.Emitter.OnMeasurement(streamID, m)
c.Emitter.OnDebug(fmt.Sprintf("#%d - application r/w: %d/%d, network r/w: %d/%d\n",
streamID, m.Application.BytesReceived, m.Application.BytesSent,
m.Network.BytesReceived, m.Network.BytesSent))
c.storeMeasurement(streamID, m)
case err := <-errCh:
return err
}
}
}

func (c *Throughput1Client) emitResults(streamID int, m model.WireMeasurement,
globalStartTime time.Time) {
c.Emitter.OnMeasurement(streamID, m)
elapsed := time.Since(globalStartTime)
streamResult := StreamResult{
StreamID: streamID,
Result: Result{
Elapsed: elapsed,
Goodput: float64(m.Application.BytesReceived) / float64(m.ElapsedTime) * 8,
Throughput: float64(m.Network.BytesReceived) / float64(m.ElapsedTime) * 8,
MinRTT: m.TCPInfo.MinRTT,
},
}
c.Emitter.OnStreamResult(streamResult)

// Append the value of the BytesReceived counter to the corresponding recvByteCounters map entry
// for the current streamID.
//
// Note: The recvByteCounters map may be accessed by multiple goroutines.
func (c *Throughput1Client) storeMeasurement(streamID int, m model.WireMeasurement) {
// Append the value of the Application.BytesReceived counter to the corresponding recvByteCounters map entry.
c.recvByteCountersMutex.Lock()
c.recvByteCounters[streamID] = append(c.recvByteCounters[streamID], m.Application.BytesReceived)
c.recvByteCountersMutex.Unlock()
}

// applicationBytes returns the aggregate application-level bytes transferred by all the streams.
func (c *Throughput1Client) applicationBytes() int64 {
var sum int64
c.recvByteCountersMutex.Lock()
for _, bytes := range c.recvByteCounters {
sum += bytes[len(bytes)-1]
}
c.recvByteCountersMutex.Unlock()
return sum
}

// emitResult emits the result of the current measurement via the configured Emitter.
func (c *Throughput1Client) emitResult(start time.Time) {
applicationBytes := c.applicationBytes()
elapsed := time.Since(start)
goodput := float64(applicationBytes) / float64(elapsed.Seconds()) * 8 // bps
result := Result{
Elapsed: elapsed,
Goodput: float64(sum) / float64(elapsed.Microseconds()) * 8,
Elapsed: elapsed,
Goodput: goodput,
Throughput: 0, // TODO
}
c.Emitter.OnResult(result)
}
Expand Down
11 changes: 2 additions & 9 deletions pkg/client/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ type Emitter interface {
OnConnect(server string)
OnMeasurement(id int, m model.WireMeasurement)
OnResult(Result)
OnStreamResult(StreamResult)
OnError(err error)
OnComplete(streamID int, server string)
OnDebug(msg string)
Expand All @@ -24,14 +23,8 @@ type HumanReadable struct {

// OnResult prints the aggregate result.
func (*HumanReadable) OnResult(r Result) {
fmt.Printf("Elapsed: %.2fs, Goodput: %f, MinRTT: %d\n", r.Elapsed.Seconds(),
r.Goodput, r.MinRTT)
}

// OnStreamResult prints the per-stream result.
func (*HumanReadable) OnStreamResult(sr StreamResult) {
fmt.Printf("\tStream #%d - gp %f, tp: %f, minrtt: %d\n", sr.StreamID,
sr.Goodput, sr.Throughput, sr.MinRTT)
fmt.Printf("Elapsed: %.2fs, Goodput: %f Mb/s, MinRTT: %d\n", r.Elapsed.Seconds(),
r.Goodput/1024/1024, r.MinRTT)
}

// OnStart is called when the stream starts.
Expand Down

0 comments on commit b9d430c

Please sign in to comment.