Skip to content

Commit

Permalink
fix: p2p.Peer restart (#688)
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain authored Dec 3, 2024
1 parent 44f0928 commit ddeca39
Showing 1 changed file with 31 additions and 15 deletions.
46 changes: 31 additions & 15 deletions internal/p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,10 @@ func NewPeer(logger *slog.Logger, msgHandler MessageHandlerI, address string, ne
),
)

ctx, cancelFn := context.WithCancel(context.Background())

p := &Peer{
dial: net.Dial,
execCtx: ctx,
cancelExecCtx: cancelFn,
l: l,
mh: msgHandler,
dial: net.Dial,
l: l,
mh: msgHandler,

address: address,
network: network,
Expand Down Expand Up @@ -120,7 +116,9 @@ func (p *Peer) Restart() bool {
defer p.startMu.Unlock()

p.l.Info("Restarting")
p.disconnect()
if p.connected.Load() {
p.disconnect()
}

return p.connect()
}
Expand All @@ -129,6 +127,10 @@ func (p *Peer) Shutdown() {
p.startMu.Lock()
defer p.startMu.Unlock()

if !p.connected.Load() {
return
}

p.l.Info("Shutting down")
p.disconnect()
p.l.Info("Shutdown complete")
Expand All @@ -152,6 +154,10 @@ func (p *Peer) String() string {
func (p *Peer) connect() bool {
p.l.Info("Connecting")

ctx, cancelFn := context.WithCancel(context.Background())
p.execCtx = ctx
p.cancelExecCtx = cancelFn

lc, err := p.dial("tcp", p.address)
if err != nil {
p.l.Error("Failed to dial node",
Expand Down Expand Up @@ -316,6 +322,9 @@ func (p *Peer) keepAlive() {
go func() {
defer p.execWg.Done()

p.l.Debug("Start keep-alive")
defer p.l.Debug("Stop keep-alive")

t := time.NewTicker(p.pingInterval)
defer t.Stop()

Expand All @@ -342,6 +351,8 @@ func (p *Peer) healthMonitor() {
go func() {
defer p.execWg.Done()

p.l.Debug("Start health monitor")
defer p.l.Debug("Stop health monitor")
// if no ping/pong signal is received for certain amount of time, mark peer as unhealthy and disconnect
t := time.NewTicker(p.healthThreshold)
defer t.Stop()
Expand Down Expand Up @@ -370,9 +381,12 @@ func (p *Peer) disconnect() {

p.cancelExecCtx()
p.execWg.Wait()

_ = p.lConn.Close()

p.lConn = nil
p.execCtx = nil
p.cancelExecCtx = nil

p.connected.Store(false)
p.l.Info("Disconnected")
}
Expand All @@ -396,6 +410,7 @@ func (p *Peer) listenForMessages() {
go func() {
l := p.l
l.Debug("Starting read handler")
defer l.Debug("Shutting down read handler")
defer p.execWg.Done()

reader := bufio.NewReader(&io.LimitedReader{R: p.lConn, N: p.maxMsgSize})
Expand All @@ -404,7 +419,6 @@ func (p *Peer) listenForMessages() {
msg, err := readWireMsg(p.execCtx, reader, wire.ProtocolVersion, p.network)
if err != nil {
if errors.Is(err, context.Canceled) {
l.Debug("Shutting down read handler")
return
}

Expand All @@ -430,17 +444,17 @@ func (p *Peer) listenForMessages() {
continue
}

l.Debug("Received", slogUpperString(commandKey, cmd))
l.Log(context.Background(), slogLvlTrace, "Received", slogUpperString(commandKey, cmd))
p.aliveCh <- struct{}{}
p.writeCh <- wire.NewMsgPong(ping.Nonce) // are we sure it should go with write channel not beside?

case wire.CmdPong:
l.Debug("Received", slogUpperString(commandKey, cmd))
l.Log(context.Background(), slogLvlTrace, "Received", slogUpperString(commandKey, cmd))
p.aliveCh <- struct{}{}

// pass message to client
default:
l.Debug("Received", slogUpperString(commandKey, cmd))
l.Log(context.Background(), slogLvlTrace, "Received", slogUpperString(commandKey, cmd))
p.mh.OnReceive(msg, p)
}
}
Expand All @@ -454,12 +468,12 @@ func (p *Peer) sendMessages(n uint8) {
l := p.l.With(slog.Int("instance", int(n)))

l.Debug("Starting write handler")
defer l.Debug("Shutting down write handler")
defer p.execWg.Done()

for {
select {
case <-p.execCtx.Done():
l.Debug("Shutting down write handler")
return

case msg := <-p.writeCh:
Expand All @@ -477,7 +491,7 @@ func (p *Peer) sendMessages(n uint8) {
return
}

l.Debug("Sent", slogUpperString(commandKey, msg.Command()))
l.Log(context.Background(), slogLvlTrace, "Sent", slogUpperString(commandKey, msg.Command()))
// let client react on sending msg
p.mh.OnSend(msg, p)
}
Expand Down Expand Up @@ -511,3 +525,5 @@ func readWireMsg(ctx context.Context, r io.Reader, pver uint32, bsvnet wire.Bitc
func slogUpperString(key, val string) slog.Attr {
return slog.String(key, strings.ToUpper(val))
}

const slogLvlTrace slog.Level = slog.LevelDebug - 4

0 comments on commit ddeca39

Please sign in to comment.