Skip to content

Commit

Permalink
BAARC-148: Separate reconnect function
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim committed May 15, 2024
1 parent a811d1a commit 44eb44f
Showing 1 changed file with 36 additions and 39 deletions.
75 changes: 36 additions & 39 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,18 @@ type Peer struct {
userAgentName *string
userAgentVersion *string
retryReadWriteMessageInterval time.Duration
cancelReadHandler context.CancelFunc
cancelWriteHandler context.CancelFunc
cancelReconnecting context.CancelFunc
cancelAll context.CancelFunc

ctx context.Context

cancelReadHandler context.CancelFunc
cancelWriteHandler context.CancelFunc
cancelAll context.CancelFunc

readerWg *sync.WaitGroup
writerWg *sync.WaitGroup
reconnectingWg *sync.WaitGroup
pingHandlerWg *sync.WaitGroup
healthMonitorWg *sync.WaitGroup

ctx context.Context
}

// NewPeer returns a new bitcoin peer for the provided address and configuration.
Expand Down Expand Up @@ -152,37 +152,8 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw
}

// reconnect if disconnected, but only on outgoing connections
reconnectCtx, cancelReconnect := context.WithCancel(ctx)
p.cancelReconnecting = cancelReconnect

p.reconnectingWg.Add(1)
go func(funcCtx context.Context) {
defer func() {
p.reconnectingWg.Done()
}()
connectErr := p.connectAndStartReadWriteHandlers(funcCtx)
if connectErr != nil {
p.logger.Warn("Failed to connect to peer", slog.String(errKey, connectErr.Error()))
}

for {
select {
case <-time.NewTicker(reconnectInterval).C:
if p.Connected() || p.Connecting() {
continue
}
p.logger.Info("Reconnecting")

connectErr = p.connectAndStartReadWriteHandlers(funcCtx)
if connectErr != nil {
p.logger.Warn("Failed to connect to peer", slog.String(errKey, connectErr.Error()))
continue
}
case <-reconnectCtx.Done():
return
}
}
}(ctx)
go p.reconnect(ctx)

return p, nil
}
Expand All @@ -193,6 +164,34 @@ func (p *Peer) disconnectLock() {
p.disconnect()
}

func (p *Peer) reconnect(ctx context.Context) {
defer func() {
p.reconnectingWg.Done()
}()
connectErr := p.connectAndStartReadWriteHandlers(ctx)
if connectErr != nil {
p.logger.Warn("Failed to connect to peer", slog.String(errKey, connectErr.Error()))
}

for {
select {
case <-time.NewTicker(reconnectInterval).C:
if p.Connected() || p.Connecting() {
continue
}
p.logger.Info("Reconnecting")

connectErr = p.connectAndStartReadWriteHandlers(ctx)
if connectErr != nil {
p.logger.Warn("Failed to connect to peer", slog.String(errKey, connectErr.Error()))
continue
}
case <-p.ctx.Done():
return
}
}
}

func (p *Peer) disconnect() {
if p.readConn != nil {
_ = p.readConn.Close()
Expand Down Expand Up @@ -848,10 +847,8 @@ func (p *Peer) stopWriteHandler() {
}

func (p *Peer) Shutdown() {
p.cancelReconnecting()
p.reconnectingWg.Wait()

p.cancelAll()
p.reconnectingWg.Wait()
p.healthMonitorWg.Wait()
p.pingHandlerWg.Wait()

Expand Down

0 comments on commit 44eb44f

Please sign in to comment.