From f64832bf1caee2093e85892499a562c3ef138821 Mon Sep 17 00:00:00 2001 From: Michael Boeckli Date: Wed, 20 Mar 2024 09:46:57 +0100 Subject: [PATCH] Use cancelling context to stop read handler --- Peer.go | 95 ++++++++++++++++++++++------------------ peer_integration_test.go | 2 + 2 files changed, 54 insertions(+), 43 deletions(-) diff --git a/Peer.go b/Peer.go index 7498c51..047acb5 100644 --- a/Peer.go +++ b/Peer.go @@ -2,6 +2,7 @@ package p2p import ( "bufio" + "context" "encoding/hex" "errors" "fmt" @@ -52,27 +53,26 @@ type Block struct { } type Peer struct { - address string - network wire.BitcoinNet - mu sync.RWMutex - readConn net.Conn - writeConn net.Conn - incomingConn net.Conn - dial func(network, address string) (net.Conn, error) - peerHandler PeerHandlerI - writeChan chan wire.Message - quit chan struct{} - pingPongAlive chan struct{} - logger *slog.Logger - sentVerAck atomic.Bool - receivedVerAck atomic.Bool - batchDelay time.Duration - invBatcher *batcher.Batcher[chainhash.Hash] - dataBatcher *batcher.Batcher[chainhash.Hash] - maximumMessageSize int64 - isHealthy bool - quitReadHandler chan struct{} - quitReadHandlerComplete chan struct{} + address string + network wire.BitcoinNet + mu sync.RWMutex + readConn net.Conn + writeConn net.Conn + incomingConn net.Conn + dial func(network, address string) (net.Conn, error) + peerHandler PeerHandlerI + writeChan chan wire.Message + quit chan struct{} + pingPongAlive chan struct{} + logger *slog.Logger + sentVerAck atomic.Bool + receivedVerAck atomic.Bool + batchDelay time.Duration + invBatcher *batcher.Batcher[chainhash.Hash] + dataBatcher *batcher.Batcher[chainhash.Hash] + maximumMessageSize int64 + isHealthy bool + cancelReadHandler context.CancelFunc } // NewPeer returns a new bitcoin peer for the provided address and configuration. @@ -254,8 +254,17 @@ func (p *Peer) String() string { return p.address } -func (p *Peer) readRetry(r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire.Message, error) { +func (p *Peer) readRetry(ctx context.Context, r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire.Message, error) { policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryReadWriteMessageInterval), retryReadWriteMessageAttempts) + + //ctx, cancel := context.WithCancel(context.Background()) + //ctx := context.Background() + policyContext := backoff.WithContext(policy, ctx) + + //p.mu.Lock() + //p.cancelReadHandler = cancel + //p.mu.Unlock() + operation := func() (wire.Message, error) { msg, _, err := wire.ReadMessage(r, pver, bsvnet) if err != nil { @@ -272,7 +281,7 @@ func (p *Peer) readRetry(r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire } } - msg, err := backoff.RetryNotifyWithData(operation, policy, notifyAndReconnect) + msg, err := backoff.RetryNotifyWithData(operation, policyContext, notifyAndReconnect) if err != nil { return nil, err } @@ -281,42 +290,45 @@ func (p *Peer) readRetry(r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire } func (p *Peer) startReadHandler() { - p.quitReadHandler = make(chan struct{}, 10) - p.quitReadHandlerComplete = make(chan struct{}, 10) + ctx, cancel := context.WithCancel(context.Background()) + p.cancelReadHandler = cancel p.logger.Info("Starting read handler") - go func() { + go func(cancelCtx context.Context) { + defer func() { + p.logger.Info("Shutting down read handler") + }() readConn := p.readConn + var msg wire.Message + var err error if readConn == nil { + p.cancelReadHandler = nil p.logger.Error("no connection") return } - go func() { - if p.quitReadHandlerComplete != nil { - p.quitReadHandlerComplete <- struct{}{} - } - p.logger.Info("Shutting down read handler") - }() - reader := bufio.NewReader(&io.LimitedReader{R: readConn, N: p.maximumMessageSize}) for { select { - case <-p.quitReadHandler: + case <-cancelCtx.Done(): return default: - msg, err := p.readRetry(reader, wire.ProtocolVersion, p.network) + msg, err = p.readRetry(cancelCtx, reader, wire.ProtocolVersion, p.network) if err != nil { + if errors.Is(err, context.Canceled) { + p.logger.Info("Retrying to read cancelled") + return + } + p.logger.Error("Retrying to read failed", slog.String(errKey, err.Error())) p.disconnect() p.mu.Lock() - p.quitReadHandler = nil - p.quitReadHandlerComplete = nil + p.cancelReadHandler = nil p.mu.Unlock() return @@ -447,7 +459,7 @@ func (p *Peer) startReadHandler() { } } } - }() + }(ctx) } func (p *Peer) handleGetDataMsg(dataMsg *wire.MsgGetData, logger *slog.Logger) { @@ -693,10 +705,7 @@ func (p *Peer) IsHealthy() bool { } func (p *Peer) Shutdown() { - p.mu.Lock() - defer p.mu.Unlock() - if p.quitReadHandler != nil { - p.quitReadHandler <- struct{}{} - <-p.quitReadHandlerComplete + if p.cancelReadHandler != nil { + p.cancelReadHandler() } } diff --git a/peer_integration_test.go b/peer_integration_test.go index 4a401d3..66a4f43 100644 --- a/peer_integration_test.go +++ b/peer_integration_test.go @@ -112,6 +112,8 @@ func TestNewPeer(t *testing.T) { time.Sleep(reconnectInterval + 2*time.Second) require.True(t, peer.Connected()) + //err = dockerClient.StopContainer(resource.Container.ID, 10) + require.NoError(t, err) peer.Shutdown() }) }