Skip to content

Commit

Permalink
fix: recording time elapsed during connection reads and use it to cal…
Browse files Browse the repository at this point in the history
…culate the connection speed
  • Loading branch information
WendelHime committed Dec 12, 2024
1 parent 43b2bc4 commit 457c2e7
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions dialer/bandit.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,10 @@ func (bd *BanditDialer) DialContext(ctx context.Context, network, addr string) (

// Tell the dialer to update the bandit with it's throughput after 5 seconds.
var dataRecv atomic.Uint64
dt := newDataTrackingConn(conn, &dataRecv)
var elapsedTimeReading atomic.Int64
dt := newDataTrackingConn(conn, &dataRecv, &elapsedTimeReading)
time.AfterFunc(secondsForSample*time.Second, func() {
speed := normalizeReceiveSpeed(dataRecv.Load())
speed := normalizeReceiveSpeed(dataRecv.Load(), elapsedTimeReading.Load())
//log.Debugf("Dialer %v received %v bytes in %v seconds, normalized speed: %v", d.Name(), dt.dataRecv, secondsForSample, speed)
if err = bd.bandit.Update(chosenArm, speed); err != nil {
log.Errorf("unable to update bandit: %v", err)
Expand Down Expand Up @@ -343,9 +344,9 @@ const secondsForSample = 6
// Anything over this will be normalized to over 1.
const topExpectedBps = 125000

func normalizeReceiveSpeed(dataRecv uint64) float64 {
func normalizeReceiveSpeed(dataRecv uint64, elapsedTimeReading int64) float64 {
// Record the bytes in relation to the top expected speed.
return (float64(dataRecv) / secondsForSample) / topExpectedBps
return (float64(dataRecv) / float64(elapsedTimeReading*1000)) / topExpectedBps
}

func (o *BanditDialer) Close() {
Expand All @@ -355,19 +356,25 @@ func (o *BanditDialer) Close() {
}
}

func newDataTrackingConn(conn net.Conn, dataRecv *atomic.Uint64) *dataTrackingConn {
func newDataTrackingConn(conn net.Conn, dataRecv *atomic.Uint64, elapsedTimeReading *atomic.Int64) *dataTrackingConn {
return &dataTrackingConn{
Conn: conn,
dataRecv: dataRecv,
Conn: conn,
dataRecv: dataRecv,
elapsedTimeReading: elapsedTimeReading,
}
}

type dataTrackingConn struct {
net.Conn
dataRecv *atomic.Uint64
dataRecv *atomic.Uint64
elapsedTimeReading *atomic.Int64
}

func (c *dataTrackingConn) Read(b []byte) (int, error) {
startedReading := time.Now()
defer func() {
c.elapsedTimeReading.Add(time.Since(startedReading).Milliseconds())
}()
n, err := c.Conn.Read(b)
c.dataRecv.Add(uint64(n))
return n, err
Expand Down

0 comments on commit 457c2e7

Please sign in to comment.