Skip to content

Commit

Permalink
Use cancelling context to stop read handler
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim committed Mar 20, 2024
1 parent 4609285 commit f64832b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 43 deletions.
95 changes: 52 additions & 43 deletions Peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package p2p

import (
"bufio"
"context"
"encoding/hex"
"errors"
"fmt"
Expand Down Expand Up @@ -52,27 +53,26 @@ type Block struct {
}

type Peer struct {
address string
network wire.BitcoinNet
mu sync.RWMutex
readConn net.Conn
writeConn net.Conn
incomingConn net.Conn
dial func(network, address string) (net.Conn, error)
peerHandler PeerHandlerI
writeChan chan wire.Message
quit chan struct{}
pingPongAlive chan struct{}
logger *slog.Logger
sentVerAck atomic.Bool
receivedVerAck atomic.Bool
batchDelay time.Duration
invBatcher *batcher.Batcher[chainhash.Hash]
dataBatcher *batcher.Batcher[chainhash.Hash]
maximumMessageSize int64
isHealthy bool
quitReadHandler chan struct{}
quitReadHandlerComplete chan struct{}
address string
network wire.BitcoinNet
mu sync.RWMutex
readConn net.Conn
writeConn net.Conn
incomingConn net.Conn
dial func(network, address string) (net.Conn, error)
peerHandler PeerHandlerI
writeChan chan wire.Message
quit chan struct{}
pingPongAlive chan struct{}
logger *slog.Logger
sentVerAck atomic.Bool
receivedVerAck atomic.Bool
batchDelay time.Duration
invBatcher *batcher.Batcher[chainhash.Hash]
dataBatcher *batcher.Batcher[chainhash.Hash]
maximumMessageSize int64
isHealthy bool
cancelReadHandler context.CancelFunc
}

// NewPeer returns a new bitcoin peer for the provided address and configuration.
Expand Down Expand Up @@ -254,8 +254,17 @@ func (p *Peer) String() string {
return p.address
}

func (p *Peer) readRetry(r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire.Message, error) {
func (p *Peer) readRetry(ctx context.Context, r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire.Message, error) {
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryReadWriteMessageInterval), retryReadWriteMessageAttempts)

//ctx, cancel := context.WithCancel(context.Background())
//ctx := context.Background()
policyContext := backoff.WithContext(policy, ctx)

//p.mu.Lock()
//p.cancelReadHandler = cancel
//p.mu.Unlock()

operation := func() (wire.Message, error) {
msg, _, err := wire.ReadMessage(r, pver, bsvnet)
if err != nil {
Expand All @@ -272,7 +281,7 @@ func (p *Peer) readRetry(r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire
}
}

msg, err := backoff.RetryNotifyWithData(operation, policy, notifyAndReconnect)
msg, err := backoff.RetryNotifyWithData(operation, policyContext, notifyAndReconnect)
if err != nil {
return nil, err
}
Expand All @@ -281,42 +290,45 @@ func (p *Peer) readRetry(r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire
}

func (p *Peer) startReadHandler() {
p.quitReadHandler = make(chan struct{}, 10)
p.quitReadHandlerComplete = make(chan struct{}, 10)
ctx, cancel := context.WithCancel(context.Background())
p.cancelReadHandler = cancel

p.logger.Info("Starting read handler")

go func() {
go func(cancelCtx context.Context) {
defer func() {
p.logger.Info("Shutting down read handler")
}()

readConn := p.readConn
var msg wire.Message
var err error

if readConn == nil {
p.cancelReadHandler = nil
p.logger.Error("no connection")
return
}

go func() {
if p.quitReadHandlerComplete != nil {
p.quitReadHandlerComplete <- struct{}{}
}
p.logger.Info("Shutting down read handler")
}()

reader := bufio.NewReader(&io.LimitedReader{R: readConn, N: p.maximumMessageSize})
for {
select {
case <-p.quitReadHandler:
case <-cancelCtx.Done():
return
default:
msg, err := p.readRetry(reader, wire.ProtocolVersion, p.network)
msg, err = p.readRetry(cancelCtx, reader, wire.ProtocolVersion, p.network)
if err != nil {
if errors.Is(err, context.Canceled) {
p.logger.Info("Retrying to read cancelled")
return
}

p.logger.Error("Retrying to read failed", slog.String(errKey, err.Error()))

p.disconnect()

p.mu.Lock()
p.quitReadHandler = nil
p.quitReadHandlerComplete = nil
p.cancelReadHandler = nil
p.mu.Unlock()

return
Expand Down Expand Up @@ -447,7 +459,7 @@ func (p *Peer) startReadHandler() {
}
}
}
}()
}(ctx)
}

func (p *Peer) handleGetDataMsg(dataMsg *wire.MsgGetData, logger *slog.Logger) {
Expand Down Expand Up @@ -693,10 +705,7 @@ func (p *Peer) IsHealthy() bool {
}

func (p *Peer) Shutdown() {
p.mu.Lock()
defer p.mu.Unlock()
if p.quitReadHandler != nil {
p.quitReadHandler <- struct{}{}
<-p.quitReadHandlerComplete
if p.cancelReadHandler != nil {
p.cancelReadHandler()
}
}
2 changes: 2 additions & 0 deletions peer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ func TestNewPeer(t *testing.T) {
time.Sleep(reconnectInterval + 2*time.Second)
require.True(t, peer.Connected())

//err = dockerClient.StopContainer(resource.Container.ID, 10)
require.NoError(t, err)
peer.Shutdown()
})
}

0 comments on commit f64832b

Please sign in to comment.