Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(archive): archive final messages sent to client #34

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions pkg/throughput1/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,21 +212,15 @@ func (p *Protocol) sendCounterflow(ctx context.Context,
select {
case <-ctx.Done():
// Attempt to send final write message before close. Ignore errors.
p.sendWireMeasurement(ctx, p.measurer.Measure(ctx))
p.sendAndSaveWireMessage(ctx, p.measurer.Measure(ctx), results)
p.close(ctx)
return
case m := <-measurerCh:
wm, err := p.sendWireMeasurement(ctx, m)
err := p.sendAndSaveWireMessage(ctx, m, results)
if err != nil {
errCh <- err
return
}
// This send is non-blocking in case there is no one to read the
// Measurement message and the channel's buffer is full.
select {
case results <- *wm:
default:
}

// End the test once enough bytes have been received.
if byteLimit > 0 && m.TCPInfo != nil && m.TCPInfo.BytesReceived >= byteLimit {
Expand All @@ -238,6 +232,21 @@ func (p *Protocol) sendCounterflow(ctx context.Context,
}
}

func (p *Protocol) sendAndSaveWireMessage(ctx context.Context, m model.Measurement, results chan<- model.WireMeasurement) error {
wm, err := p.sendWireMeasurement(ctx, m)
if err != nil {
return err
}

// This send is non-blocking in case there is no one to read the
// Measurement message and the channel's buffer is full.
select {
case results <- *wm:
default:
}
return nil
}

func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measurement,
results chan<- model.WireMeasurement, errCh chan<- error) {
size := p.ScaleMessage(spec.MinMessageSize, 0)
Expand All @@ -256,22 +265,15 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme
select {
case <-ctx.Done():
// Attempt to send final write message before close. Ignore errors.
p.sendWireMeasurement(ctx, p.measurer.Measure(ctx))
p.sendAndSaveWireMessage(ctx, p.measurer.Measure(ctx), results)
p.close(ctx)
return
case m := <-measurerCh:
wm, err := p.sendWireMeasurement(ctx, m)
err := p.sendAndSaveWireMessage(ctx, m, results)
if err != nil {
errCh <- err
return
}

// This send is non-blocking in case there is no one to read the
// Measurement message and the channel's buffer is full.
select {
case results <- *wm:
default:
}
default:
err = p.conn.WritePreparedMessage(message)
if err != nil {
Expand All @@ -283,7 +285,7 @@ func (p *Protocol) sender(ctx context.Context, measurerCh <-chan model.Measureme

bytesSent := int(p.applicationBytesSent.Load())
if p.byteLimit > 0 && bytesSent >= p.byteLimit {
_, err := p.sendWireMeasurement(ctx, p.measurer.Measure(ctx))
err := p.sendAndSaveWireMessage(ctx, p.measurer.Measure(ctx), results)
if err != nil {
errCh <- err
return
Expand Down