Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use timer for background PINGs instead of a long-live goroutine #760

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
41 changes: 40 additions & 1 deletion pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg
p.background()
}
if p.timeout > 0 && p.pinggap > 0 {
go p.backgroundPing()
p.schedulePing(atomic.LoadInt32(&p.recvs))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea how we should benchmark this? @rueian

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a clear idea but I think we can first do a separate simulation of many <-ticker.C vs many time.AfterFunc to see if the latter really helps for memory usage and better scheduling on other goroutines. The simulation doesn't need to have rueidis involved.

}
}
return p, nil
Expand Down Expand Up @@ -659,6 +659,45 @@ func (p *pipe) backgroundPing() {
}
}

func (p *pipe) schedulePing(prevRecv int32) {
if p.timeout <= 0 || p.pinggap <= 0 {
return
}

time.AfterFunc(p.pinggap, func() {
if atomic.LoadInt32(&p.blcksig) != 0 ||
(atomic.LoadInt32(&p.state) == 0 && atomic.LoadInt32(&p.waits) != 0) {
p.schedulePing(prevRecv) // Reschedule without sending PING
return
}

recv := atomic.LoadInt32(&p.recvs)
if recv != prevRecv {
p.schedulePing(recv) // Activity detected, reset the cycle
return
}

ch := make(chan error, 1)
tm := time.NewTimer(p.timeout)
go func() { ch <- p.Do(context.Background(), cmds.PingCmd).NonRedisError() }()

select {
case <-p.close:
return // Stop if the connection is closing
case <-tm.C:
p._exit(os.ErrDeadlineExceeded)
return
case err := <-ch:
tm.Stop()
if err == nil || atomic.LoadInt32(&p.blcksig) != 0 {
p.schedulePing(recv) // No error, reschedule
} else {
p._exit(err)
}
}
})
}

func (p *pipe) handlePush(values []RedisMessage) (reply bool, unsubscribe bool) {
if len(values) < 2 {
return
Expand Down
Loading