Skip to content

Commit

Permalink
Merge pull request #30 from m-lab/sandbox-soltesz-final-wiremessage
Browse files Browse the repository at this point in the history
Send a final WireMeasurement after byte limit reached
  • Loading branch information
stephen-soltesz authored Jan 4, 2024
2 parents 271633e + 9545fd5 commit af9f6c4
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 67 deletions.
37 changes: 26 additions & 11 deletions internal/measurer/measurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,22 @@ import (
"github.com/m-lab/msak/pkg/throughput1/spec"
)

type throughput1Measurer struct {
// Throughput1Measurer tracks state for collecting connection measurements.
type Throughput1Measurer struct {
connInfo netx.ConnInfo
startTime time.Time
bytesReadAtStart int64
bytesWrittenAtStart int64

dstChan chan model.Measurement

// ReadChan is a readable channel for measurements created by the measurer.
ReadChan <-chan model.Measurement
}

// New creates an empty Throughput1Measurer. The measurer must be started with Start.
func New() *Throughput1Measurer {
return &Throughput1Measurer{}
}

// Start starts a measurer goroutine that periodically reads the tcp_info and
Expand All @@ -31,7 +40,7 @@ type throughput1Measurer struct {
//
// The context determines the measurer goroutine's lifetime.
// If passed a connection that is not a netx.Conn, this function will panic.
func Start(ctx context.Context, conn net.Conn) <-chan model.Measurement {
func (m *Throughput1Measurer) Start(ctx context.Context, conn net.Conn) <-chan model.Measurement {
// Implementation note: this channel must be buffered to account for slow
// readers. The "typical" reader is an throughput1 send or receive loop, which
// might be busy with data r/w. The buffer size corresponds to at least 10
Expand All @@ -42,9 +51,10 @@ func Start(ctx context.Context, conn net.Conn) <-chan model.Measurement {

connInfo := netx.ToConnInfo(conn)
read, written := connInfo.ByteCounters()
m := &throughput1Measurer{
*m = Throughput1Measurer{
connInfo: connInfo,
dstChan: dst,
ReadChan: dst,
startTime: time.Now(),
// Byte counters are offset by their initial value, so that the
// BytesSent/BytesReceived fields represent "application-level bytes
Expand All @@ -55,10 +65,10 @@ func Start(ctx context.Context, conn net.Conn) <-chan model.Measurement {
bytesWrittenAtStart: int64(written),
}
go m.loop(ctx)
return dst
return m.ReadChan
}

func (m *throughput1Measurer) loop(ctx context.Context) {
func (m *Throughput1Measurer) loop(ctx context.Context) {
log.Debug("Measurer started", "context", ctx)
defer log.Debug("Measurer stopped", "context", ctx)
t, err := memoryless.NewTicker(ctx, memoryless.Config{
Expand All @@ -81,7 +91,16 @@ func (m *throughput1Measurer) loop(ctx context.Context) {
}
}

func (m *throughput1Measurer) measure(ctx context.Context) {
func (m *Throughput1Measurer) measure(ctx context.Context) {
select {
case <-ctx.Done():
// NOTHING
case m.dstChan <- m.Measure(ctx):
}
}

// Measure collects metrics about the life of the connection.
func (m *Throughput1Measurer) Measure(ctx context.Context) model.Measurement {
// On non-Linux systems, collecting kernel metrics WILL fail. In that case,
// we still want to return a (empty) Measurement.
bbrInfo, tcpInfo, err := m.connInfo.Info()
Expand All @@ -92,10 +111,7 @@ func (m *throughput1Measurer) measure(ctx context.Context) {
// Read current bytes counters.
totalRead, totalWritten := m.connInfo.ByteCounters()

select {
case <-ctx.Done():
// NOTHING
case m.dstChan <- model.Measurement{
return model.Measurement{
ElapsedTime: time.Since(m.startTime).Microseconds(),
Network: model.ByteCounters{
BytesSent: int64(totalWritten) - m.bytesWrittenAtStart,
Expand All @@ -106,6 +122,5 @@ func (m *throughput1Measurer) measure(ctx context.Context) {
LinuxTCPInfo: tcpInfo,
ElapsedTime: time.Since(m.connInfo.AcceptTime()).Microseconds(),
},
}:
}
}
3 changes: 2 additions & 1 deletion internal/measurer/measurer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ func TestNdt8Measurer_Start(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mchan := measurer.Start(ctx, serverConn)
m := measurer.New()
mchan := m.Start(ctx, serverConn)
go func() {
_, err := serverConn.Write([]byte("test"))
rtx.Must(err, "failed to write to pipe")
Expand Down
98 changes: 43 additions & 55 deletions pkg/throughput1/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,10 @@ type senderFunc func(ctx context.Context,
measurerCh <-chan model.Measurement, results chan<- model.WireMeasurement,
errCh chan<- error)

// Measurer is an interface for collecting connection metrics.
type Measurer interface {
Start(context.Context, net.Conn) <-chan model.Measurement
}

// DefaultMeasurer is the default throughput1 measurer that wraps the measurer
// package's Start function.
type DefaultMeasurer struct{}

func (*DefaultMeasurer) Start(ctx context.Context,
c net.Conn) <-chan model.Measurement {
return measurer.Start(ctx, c)
Measure(ctx context.Context) model.Measurement
}

// Protocol is the implementation of the throughput1 protocol.
Expand All @@ -59,7 +52,7 @@ func New(conn *websocket.Conn) *Protocol {
connInfo: netx.ToConnInfo(conn.UnderlyingConn()),
// Seed randomness source with the current time.
rnd: rand.New(rand.NewSource(time.Now().UnixMilli())),
measurer: &DefaultMeasurer{},
measurer: measurer.New(),
}
}

Expand Down Expand Up @@ -185,51 +178,59 @@ func (p *Protocol) receiver(ctx context.Context,
}
}

func (p *Protocol) sendWireMeasurement(ctx context.Context, m model.Measurement) (*model.WireMeasurement, error) {
wm := model.WireMeasurement{}
p.once.Do(func() {
wm = p.createWireMeasurement(ctx)
})
wm.Measurement = m
wm.Application = model.ByteCounters{
BytesSent: p.applicationBytesSent.Load(),
BytesReceived: p.applicationBytesReceived.Load(),
}
// Encode as JSON separately so we can read the message size before
// sending.
jsonwm, err := json.Marshal(wm)
if err != nil {
log.Printf("failed to encode measurement (ctx: %p, err: %v)", ctx, err)
return nil, err
}
err = p.conn.WriteMessage(websocket.TextMessage, jsonwm)
if err != nil {
log.Printf("failed to write measurement JSON (ctx: %p, err: %v)", ctx, err)
return nil, err
}
p.applicationBytesSent.Add(int64(len(jsonwm)))
return &wm, nil
}

func (p *Protocol) sendCounterflow(ctx context.Context,
measurerCh <-chan model.Measurement, results chan<- model.WireMeasurement,
errCh chan<- error) {
byteLimit := int64(p.byteLimit)
for {
select {
case <-ctx.Done():
// Attempt to send final write message before close. Ignore errors.
p.sendWireMeasurement(ctx, p.measurer.Measure(ctx))
p.close(ctx)
return
case m := <-measurerCh:
wm := model.WireMeasurement{}
p.once.Do(func() {
wm = p.createWireMeasurement(ctx)
})
wm.Measurement = m
wm.Application = model.ByteCounters{
BytesSent: p.applicationBytesSent.Load(),
BytesReceived: p.applicationBytesReceived.Load(),
}
// Encode as JSON separately so we can read the message size before
// sending.
jsonwm, err := json.Marshal(wm)
wm, err := p.sendWireMeasurement(ctx, m)
if err != nil {
log.Printf("failed to encode measurement (ctx: %p, err: %v)",
ctx, err)
errCh <- err
return
}
err = p.conn.WriteMessage(websocket.TextMessage, jsonwm)
if err != nil {
log.Printf("failed to write measurement JSON (ctx: %p, err: %v)", ctx, err)
errCh <- err
return
}
p.applicationBytesSent.Add(int64(len(jsonwm)))

// This send is non-blocking in case there is no one to read the
// Measurement message and the channel's buffer is full.
select {
case results <- wm:
case results <- *wm:
default:
}

// End the test once enough bytes have been received.
if byteLimit > 0 && m.TCPInfo != nil && m.TCPInfo.BytesReceived >= byteLimit {
// WireMessage was just sent above, so we do not need to send another.
p.close(ctx)
return
}
Expand All @@ -254,39 +255,21 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme
for {
select {
case <-ctx.Done():
// Attempt to send final write message before close. Ignore errors.
p.sendWireMeasurement(ctx, p.measurer.Measure(ctx))
p.close(ctx)
return
case m := <-measurerCh:
wm := model.WireMeasurement{}
p.once.Do(func() {
wm = p.createWireMeasurement(ctx)
})
wm.Measurement = m
wm.Application = model.ByteCounters{
BytesReceived: p.applicationBytesReceived.Load(),
BytesSent: p.applicationBytesSent.Load(),
}
// Encode as JSON separately so we can read the message size before
// sending.
jsonwm, err := json.Marshal(wm)
if err != nil {
log.Printf("failed to encode measurement (ctx: %p, err: %v)",
ctx, err)
errCh <- err
return
}
err = p.conn.WriteMessage(websocket.TextMessage, jsonwm)
wm, err := p.sendWireMeasurement(ctx, m)
if err != nil {
log.Printf("failed to write measurement JSON (ctx: %p, err: %v)", ctx, err)
errCh <- err
return
}
p.applicationBytesSent.Add(int64(len(jsonwm)))

// This send is non-blocking in case there is no one to read the
// Measurement message and the channel's buffer is full.
select {
case results <- wm:
case results <- *wm:
default:
}
default:
Expand All @@ -300,6 +283,11 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme

bytesSent := int(p.applicationBytesSent.Load())
if p.byteLimit > 0 && bytesSent >= p.byteLimit {
_, err := p.sendWireMeasurement(ctx, p.measurer.Measure(ctx))
if err != nil {
errCh <- err
return
}
p.close(ctx)
return
}
Expand Down

0 comments on commit af9f6c4

Please sign in to comment.