Skip to content

Commit

Permalink
Apply limits over sent bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
cristinaleonr committed Dec 29, 2023
1 parent c47c1e1 commit eac5193
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 16 deletions.
39 changes: 23 additions & 16 deletions pkg/throughput1/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Protocol struct {
applicationBytesReceived atomic.Int64
applicationBytesSent atomic.Int64

byteLimit int64
byteLimit int
}

// New returns a new Protocol with the specified connection and every other
Expand All @@ -66,7 +66,7 @@ func New(conn *websocket.Conn) *Protocol {
// SetByteLimit sets the number of bytes sent after which a test (either download or upload) will stop.
// Set the value to zero to disable the byte limit.
func (p *Protocol) SetByteLimit(value int) {
p.byteLimit = int64(value)
p.byteLimit = value
}

// Upgrade takes a HTTP request and upgrades the connection to WebSocket.
Expand Down Expand Up @@ -188,6 +188,7 @@ func (p *Protocol) receiver(ctx context.Context,
func (p *Protocol) sendCounterflow(ctx context.Context,
measurerCh <-chan model.Measurement, results chan<- model.WireMeasurement,
errCh chan<- error) {
byteLimit := int64(p.byteLimit)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -228,7 +229,7 @@ func (p *Protocol) sendCounterflow(ctx context.Context,
}

// End the test once enough bytes have been received.
if p.byteLimit > 0 && m.TCPInfo != nil && m.TCPInfo.BytesReceived >= p.byteLimit {
if byteLimit > 0 && m.TCPInfo != nil && m.TCPInfo.BytesReceived >= byteLimit {
p.close(ctx)
return
}
Expand All @@ -238,7 +239,7 @@ func (p *Protocol) sendCounterflow(ctx context.Context,

func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measurement,
results chan<- model.WireMeasurement, errCh chan<- error) {
size := spec.MinMessageSize
size := p.ScaleMessage(spec.MinMessageSize, 0)
message, err := p.makePreparedMessage(size)
if err != nil {
log.Printf("makePreparedMessage failed (ctx: %p)", ctx)
Expand Down Expand Up @@ -288,12 +289,6 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme
case results <- wm:
default:
}

// End the test once enough bytes have been acked.
if p.byteLimit > 0 && m.TCPInfo != nil && m.TCPInfo.BytesAcked >= p.byteLimit {
p.close(ctx)
return
}
default:
err = p.conn.WritePreparedMessage(message)
if err != nil {
Expand All @@ -303,27 +298,39 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme
}
p.applicationBytesSent.Add(int64(size))

// Determine whether it's time to scale the message size.
if size >= spec.MaxScaledMessageSize {
continue
bytesSent := int(p.applicationBytesSent.Load())
if p.byteLimit > 0 && bytesSent >= p.byteLimit {
p.close(ctx)
return
}

if size > int(p.applicationBytesSent.Load())/spec.ScalingFraction {
// Determine whether it's time to scale the message size.
if size >= spec.MaxScaledMessageSize || size > bytesSent/spec.ScalingFraction {
size = p.ScaleMessage(size, bytesSent)
continue
}

size *= 2
size = p.ScaleMessage(size*2, bytesSent)
message, err = p.makePreparedMessage(size)
if err != nil {
log.Printf("failed to make prepared message (ctx: %p, err: %v)", ctx, err)
errCh <- err
return
}

}
}
}

// ScaleMessage sets the binary message size taking into consideration byte limits.
func (p *Protocol) ScaleMessage(msgSize int, bytesSent int) int {
// Check if the next payload size will push the total number of bytes over the limit.
excess := bytesSent + msgSize - p.byteLimit
if p.byteLimit > 0 && excess > 0 {
msgSize -= excess
}
return msgSize
}

func (p *Protocol) close(ctx context.Context) {
msg := websocket.FormatCloseMessage(
websocket.CloseNormalClosure, "Done sending")
Expand Down
48 changes: 48 additions & 0 deletions pkg/throughput1/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,51 @@ func TestProtocol_Download(t *testing.T) {
}
}
}

func TestProtocol_ScaleMessage(t *testing.T) {
tests := []struct {
name string
byteLimit int
msgSize int
bytesSent int
want int
}{
{
name: "no-limit",
byteLimit: 0,
msgSize: 10,
bytesSent: 100,
want: 10,
},
{
name: "under-limit",
byteLimit: 200,
msgSize: 10,
bytesSent: 100,
want: 10,
},
{
name: "at-limit",
byteLimit: 110,
msgSize: 10,
bytesSent: 100,
want: 10,
},
{
name: "over-limit",
byteLimit: 110,
msgSize: 20,
bytesSent: 100,
want: 10,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &throughput1.Protocol{}
p.SetByteLimit(tt.byteLimit)
if got := p.ScaleMessage(tt.msgSize, tt.bytesSent); got != tt.want {
t.Errorf("Protocol.ScaleMessage() = %v, want %v", got, tt.want)
}
})
}
}

0 comments on commit eac5193

Please sign in to comment.