From 9f59c7ea682e9029c62b1374ba3e4f7bb613ecce Mon Sep 17 00:00:00 2001 From: Anuragkillswitch <70265851+Anuragkillswitch@users.noreply.github.com> Date: Fri, 14 Feb 2025 23:50:49 +0530 Subject: [PATCH] feat: async to sync --- pipe.go | 41 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/pipe.go b/pipe.go index adf9a5c4..f9dea8b6 100644 --- a/pipe.go +++ b/pipe.go @@ -323,7 +323,7 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg p.background() } if p.timeout > 0 && p.pinggap > 0 { - go p.backgroundPing() + p.schedulePing(atomic.LoadInt32(&p.recvs)) } } return p, nil @@ -659,6 +659,45 @@ func (p *pipe) backgroundPing() { } } +func (p *pipe) schedulePing(prevRecv int32) { + if p.timeout <= 0 || p.pinggap <= 0 { + return + } + + time.AfterFunc(p.pinggap, func() { + if atomic.LoadInt32(&p.blcksig) != 0 || + (atomic.LoadInt32(&p.state) == 0 && atomic.LoadInt32(&p.waits) != 0) { + p.schedulePing(prevRecv) // Reschedule without sending PING + return + } + + recv := atomic.LoadInt32(&p.recvs) + if recv != prevRecv { + p.schedulePing(recv) // Activity detected, reset the cycle + return + } + + ch := make(chan error, 1) + tm := time.NewTimer(p.timeout) + go func() { ch <- p.Do(context.Background(), cmds.PingCmd).NonRedisError() }() + + select { + case <-p.close: + return // Stop if the connection is closing + case <-tm.C: + p._exit(os.ErrDeadlineExceeded) + return + case err := <-ch: + tm.Stop() + if err == nil || atomic.LoadInt32(&p.blcksig) != 0 { + p.schedulePing(recv) // No error, reschedule + } else { + p._exit(err) + } + } + }) +} + func (p *pipe) handlePush(values []RedisMessage) (reply bool, unsubscribe bool) { if len(values) < 2 { return