Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send a final WireMeasurement after byte limit reached #30

Merged
merged 3 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions internal/measurer/measurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,22 @@ import (
"github.com/m-lab/msak/pkg/throughput1/spec"
)

type throughput1Measurer struct {
// Throughput1Measurer tracks state for collecting connection measurements.
type Throughput1Measurer struct {
connInfo netx.ConnInfo
startTime time.Time
bytesReadAtStart int64
bytesWrittenAtStart int64

dstChan chan model.Measurement

// ReadChan is a readable channel for measurements created by the measurer.
ReadChan <-chan model.Measurement
}

// New creates an empty Throughput1Measurer. The measurer must be started with Start.
func New() *Throughput1Measurer {
return &Throughput1Measurer{}
}

// Start starts a measurer goroutine that periodically reads the tcp_info and
Expand All @@ -31,7 +40,7 @@ type throughput1Measurer struct {
//
// The context determines the measurer goroutine's lifetime.
// If passed a connection that is not a netx.Conn, this function will panic.
func Start(ctx context.Context, conn net.Conn) <-chan model.Measurement {
func (m *Throughput1Measurer) Start(ctx context.Context, conn net.Conn) <-chan model.Measurement {
// Implementation note: this channel must be buffered to account for slow
// readers. The "typical" reader is an throughput1 send or receive loop, which
// might be busy with data r/w. The buffer size corresponds to at least 10
Expand All @@ -42,9 +51,10 @@ func Start(ctx context.Context, conn net.Conn) <-chan model.Measurement {

connInfo := netx.ToConnInfo(conn)
read, written := connInfo.ByteCounters()
m := &throughput1Measurer{
*m = Throughput1Measurer{
connInfo: connInfo,
dstChan: dst,
ReadChan: dst,
startTime: time.Now(),
// Byte counters are offset by their initial value, so that the
// BytesSent/BytesReceived fields represent "application-level bytes
Expand All @@ -55,10 +65,10 @@ func Start(ctx context.Context, conn net.Conn) <-chan model.Measurement {
bytesWrittenAtStart: int64(written),
}
go m.loop(ctx)
return dst
return m.ReadChan
}

func (m *throughput1Measurer) loop(ctx context.Context) {
func (m *Throughput1Measurer) loop(ctx context.Context) {
log.Debug("Measurer started", "context", ctx)
defer log.Debug("Measurer stopped", "context", ctx)
t, err := memoryless.NewTicker(ctx, memoryless.Config{
Expand All @@ -81,7 +91,16 @@ func (m *throughput1Measurer) loop(ctx context.Context) {
}
}

func (m *throughput1Measurer) measure(ctx context.Context) {
func (m *Throughput1Measurer) measure(ctx context.Context) {
select {
case <-ctx.Done():
// NOTHING
case m.dstChan <- m.Measure(ctx):
}
}

// Measure collects metrics about the life of the connection.
func (m *Throughput1Measurer) Measure(ctx context.Context) model.Measurement {
// On non-Linux systems, collecting kernel metrics WILL fail. In that case,
// we still want to return a (empty) Measurement.
bbrInfo, tcpInfo, err := m.connInfo.Info()
Expand All @@ -92,10 +111,7 @@ func (m *throughput1Measurer) measure(ctx context.Context) {
// Read current bytes counters.
totalRead, totalWritten := m.connInfo.ByteCounters()

select {
case <-ctx.Done():
// NOTHING
case m.dstChan <- model.Measurement{
return model.Measurement{
ElapsedTime: time.Since(m.startTime).Microseconds(),
Network: model.ByteCounters{
BytesSent: int64(totalWritten) - m.bytesWrittenAtStart,
Expand All @@ -106,6 +122,5 @@ func (m *throughput1Measurer) measure(ctx context.Context) {
LinuxTCPInfo: tcpInfo,
ElapsedTime: time.Since(m.connInfo.AcceptTime()).Microseconds(),
},
}:
}
}
3 changes: 2 additions & 1 deletion internal/measurer/measurer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ func TestNdt8Measurer_Start(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mchan := measurer.Start(ctx, serverConn)
m := measurer.New()
mchan := m.Start(ctx, serverConn)
go func() {
_, err := serverConn.Write([]byte("test"))
rtx.Must(err, "failed to write to pipe")
Expand Down
98 changes: 43 additions & 55 deletions pkg/throughput1/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,10 @@ type senderFunc func(ctx context.Context,
measurerCh <-chan model.Measurement, results chan<- model.WireMeasurement,
errCh chan<- error)

// Measurer is an interface for collecting connection metrics.
type Measurer interface {
Start(context.Context, net.Conn) <-chan model.Measurement
}

// DefaultMeasurer is the default throughput1 measurer that wraps the measurer
// package's Start function.
type DefaultMeasurer struct{}

func (*DefaultMeasurer) Start(ctx context.Context,
c net.Conn) <-chan model.Measurement {
return measurer.Start(ctx, c)
Measure(ctx context.Context) model.Measurement
}

// Protocol is the implementation of the throughput1 protocol.
Expand All @@ -59,7 +52,7 @@ func New(conn *websocket.Conn) *Protocol {
connInfo: netx.ToConnInfo(conn.UnderlyingConn()),
// Seed randomness source with the current time.
rnd: rand.New(rand.NewSource(time.Now().UnixMilli())),
measurer: &DefaultMeasurer{},
measurer: measurer.New(),
}
}

Expand Down Expand Up @@ -185,51 +178,59 @@ func (p *Protocol) receiver(ctx context.Context,
}
}

func (p *Protocol) sendWireMeasurement(ctx context.Context, m model.Measurement) (*model.WireMeasurement, error) {
wm := model.WireMeasurement{}
p.once.Do(func() {
wm = p.createWireMeasurement(ctx)
})
wm.Measurement = m
wm.Application = model.ByteCounters{
BytesSent: p.applicationBytesSent.Load(),
BytesReceived: p.applicationBytesReceived.Load(),
}
// Encode as JSON separately so we can read the message size before
// sending.
jsonwm, err := json.Marshal(wm)
if err != nil {
log.Printf("failed to encode measurement (ctx: %p, err: %v)", ctx, err)
return nil, err
}
err = p.conn.WriteMessage(websocket.TextMessage, jsonwm)
if err != nil {
log.Printf("failed to write measurement JSON (ctx: %p, err: %v)", ctx, err)
return nil, err
}
p.applicationBytesSent.Add(int64(len(jsonwm)))
return &wm, nil
}

func (p *Protocol) sendCounterflow(ctx context.Context,
measurerCh <-chan model.Measurement, results chan<- model.WireMeasurement,
errCh chan<- error) {
byteLimit := int64(p.byteLimit)
for {
select {
case <-ctx.Done():
// Attempt to send final write message before close. Ignore errors.
p.sendWireMeasurement(ctx, p.measurer.Measure(ctx))
p.close(ctx)
return
case m := <-measurerCh:
wm := model.WireMeasurement{}
p.once.Do(func() {
wm = p.createWireMeasurement(ctx)
})
wm.Measurement = m
wm.Application = model.ByteCounters{
BytesSent: p.applicationBytesSent.Load(),
BytesReceived: p.applicationBytesReceived.Load(),
}
// Encode as JSON separately so we can read the message size before
// sending.
jsonwm, err := json.Marshal(wm)
wm, err := p.sendWireMeasurement(ctx, m)
if err != nil {
log.Printf("failed to encode measurement (ctx: %p, err: %v)",
ctx, err)
errCh <- err
return
}
err = p.conn.WriteMessage(websocket.TextMessage, jsonwm)
if err != nil {
log.Printf("failed to write measurement JSON (ctx: %p, err: %v)", ctx, err)
errCh <- err
return
}
p.applicationBytesSent.Add(int64(len(jsonwm)))

// This send is non-blocking in case there is no one to read the
// Measurement message and the channel's buffer is full.
select {
case results <- wm:
case results <- *wm:
default:
}

// End the test once enough bytes have been received.
if byteLimit > 0 && m.TCPInfo != nil && m.TCPInfo.BytesReceived >= byteLimit {
// WireMessage was just sent above, so we do not need to send another.
p.close(ctx)
return
}
Expand All @@ -254,39 +255,21 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme
for {
select {
case <-ctx.Done():
// Attempt to send final write message before close. Ignore errors.
p.sendWireMeasurement(ctx, p.measurer.Measure(ctx))
p.close(ctx)
return
case m := <-measurerCh:
wm := model.WireMeasurement{}
p.once.Do(func() {
wm = p.createWireMeasurement(ctx)
})
wm.Measurement = m
wm.Application = model.ByteCounters{
BytesReceived: p.applicationBytesReceived.Load(),
BytesSent: p.applicationBytesSent.Load(),
}
// Encode as JSON separately so we can read the message size before
// sending.
jsonwm, err := json.Marshal(wm)
if err != nil {
log.Printf("failed to encode measurement (ctx: %p, err: %v)",
ctx, err)
errCh <- err
return
}
err = p.conn.WriteMessage(websocket.TextMessage, jsonwm)
wm, err := p.sendWireMeasurement(ctx, m)
if err != nil {
log.Printf("failed to write measurement JSON (ctx: %p, err: %v)", ctx, err)
errCh <- err
return
}
p.applicationBytesSent.Add(int64(len(jsonwm)))

// This send is non-blocking in case there is no one to read the
// Measurement message and the channel's buffer is full.
select {
case results <- wm:
case results <- *wm:
default:
}
default:
Expand All @@ -300,6 +283,11 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme

bytesSent := int(p.applicationBytesSent.Load())
if p.byteLimit > 0 && bytesSent >= p.byteLimit {
_, err := p.sendWireMeasurement(ctx, p.measurer.Measure(ctx))
if err != nil {
errCh <- err
return
}
p.close(ctx)
return
}
Expand Down