From 44eb44f92896cbdb81d353e31560a38970487278 Mon Sep 17 00:00:00 2001 From: Michael Boeckli Date: Wed, 15 May 2024 14:12:07 +0200 Subject: [PATCH] BAARC-148: Separate reconnect function --- peer.go | 75 +++++++++++++++++++++++++++------------------------------ 1 file changed, 36 insertions(+), 39 deletions(-) diff --git a/peer.go b/peer.go index fcf403c..4b50e97 100644 --- a/peer.go +++ b/peer.go @@ -74,18 +74,18 @@ type Peer struct { userAgentName *string userAgentVersion *string retryReadWriteMessageInterval time.Duration - cancelReadHandler context.CancelFunc - cancelWriteHandler context.CancelFunc - cancelReconnecting context.CancelFunc - cancelAll context.CancelFunc + + ctx context.Context + + cancelReadHandler context.CancelFunc + cancelWriteHandler context.CancelFunc + cancelAll context.CancelFunc readerWg *sync.WaitGroup writerWg *sync.WaitGroup reconnectingWg *sync.WaitGroup pingHandlerWg *sync.WaitGroup healthMonitorWg *sync.WaitGroup - - ctx context.Context } // NewPeer returns a new bitcoin peer for the provided address and configuration. @@ -152,37 +152,8 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw } // reconnect if disconnected, but only on outgoing connections - reconnectCtx, cancelReconnect := context.WithCancel(ctx) - p.cancelReconnecting = cancelReconnect - p.reconnectingWg.Add(1) - go func(funcCtx context.Context) { - defer func() { - p.reconnectingWg.Done() - }() - connectErr := p.connectAndStartReadWriteHandlers(funcCtx) - if connectErr != nil { - p.logger.Warn("Failed to connect to peer", slog.String(errKey, connectErr.Error())) - } - - for { - select { - case <-time.NewTicker(reconnectInterval).C: - if p.Connected() || p.Connecting() { - continue - } - p.logger.Info("Reconnecting") - - connectErr = p.connectAndStartReadWriteHandlers(funcCtx) - if connectErr != nil { - p.logger.Warn("Failed to connect to peer", slog.String(errKey, connectErr.Error())) - continue - } - case <-reconnectCtx.Done(): - return - } - } - }(ctx) + go p.reconnect(ctx) return p, nil } @@ -193,6 +164,34 @@ func (p *Peer) disconnectLock() { p.disconnect() } +func (p *Peer) reconnect(ctx context.Context) { + defer func() { + p.reconnectingWg.Done() + }() + connectErr := p.connectAndStartReadWriteHandlers(ctx) + if connectErr != nil { + p.logger.Warn("Failed to connect to peer", slog.String(errKey, connectErr.Error())) + } + + for { + select { + case <-time.NewTicker(reconnectInterval).C: + if p.Connected() || p.Connecting() { + continue + } + p.logger.Info("Reconnecting") + + connectErr = p.connectAndStartReadWriteHandlers(ctx) + if connectErr != nil { + p.logger.Warn("Failed to connect to peer", slog.String(errKey, connectErr.Error())) + continue + } + case <-p.ctx.Done(): + return + } + } +} + func (p *Peer) disconnect() { if p.readConn != nil { _ = p.readConn.Close() @@ -848,10 +847,8 @@ func (p *Peer) stopWriteHandler() { } func (p *Peer) Shutdown() { - p.cancelReconnecting() - p.reconnectingWg.Wait() - p.cancelAll() + p.reconnectingWg.Wait() p.healthMonitorWg.Wait() p.pingHandlerWg.Wait()