From 224c38c36d8f7388ab55195552c0f768c3dc3375 Mon Sep 17 00:00:00 2001 From: Michael Boeckli Date: Thu, 4 Jan 2024 17:14:03 +0100 Subject: [PATCH] BAARC-97: retry reading message with reconnect --- Peer.go | 47 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/Peer.go b/Peer.go index e084486..274c1a4 100644 --- a/Peer.go +++ b/Peer.go @@ -38,8 +38,8 @@ const ( sentMsg = "Sent" receivedMsg = "Recv" - retryWriteMessageInterval = 10 * time.Second - retryWriteMessageAttempts = 5 + retryReadWriteMessageInterval = 10 * time.Second + retryReadWriteMessageAttempts = 5 ) type Block struct { @@ -244,6 +244,37 @@ func (p *Peer) String() string { return p.address } +func (p *Peer) readRetry(r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire.Message, error) { + policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryReadWriteMessageInterval), retryReadWriteMessageAttempts) + operation := func() (wire.Message, error) { + msg, _, err := wire.ReadMessage(r, pver, bsvnet) + if err != nil { + return nil, err + } + return msg, nil + } + + notifyAndReconnect := func(err error, nextTry time.Duration) { + if errors.Is(err, io.EOF) { + p.logger.Error("Failed to read message: EOF", slog.Duration("next try", nextTry), slog.String(errKey, err.Error())) + } else { + p.logger.Error("Failed to read message", slog.Duration("next try", nextTry), slog.String(errKey, err.Error())) + } + + err = p.connect() + if err != nil { + p.logger.Error("Failed to reconnect", slog.Duration("next try", nextTry), slog.String(errKey, err.Error())) + } + } + + msg, err := backoff.RetryNotifyWithData(operation, policy, notifyAndReconnect) + if err != nil { + return nil, err + } + + return msg, nil +} + func (p *Peer) readHandler() { readConn := p.readConn @@ -254,15 +285,9 @@ func (p *Peer) readHandler() { reader := bufio.NewReader(&io.LimitedReader{R: readConn, N: p.maximumMessageSize}) for { - msg, b, err := wire.ReadMessage(reader, wire.ProtocolVersion, p.network) + msg, err := p.readRetry(reader, wire.ProtocolVersion, p.network) if err != nil { - if errors.Is(err, io.EOF) { - p.logger.Error("failed to read message: EOF", slog.Int("bytes", len(b)), slog.String("rawMessage", string(b)), slog.String(errKey, err.Error())) - p.disconnect() - break - } - - p.logger.Error("failed to read message", slog.Int("bytes", len(b)), slog.String("rawMessage", string(b)), slog.String(errKey, err.Error())) + p.logger.Error("failed to read", slog.String(errKey, err.Error())) continue } @@ -488,7 +513,7 @@ func (p *Peer) sendDataBatch(batch []*chainhash.Hash) { } func (p *Peer) writeRetry(msg wire.Message) error { - policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryWriteMessageInterval), retryWriteMessageAttempts) + policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryReadWriteMessageInterval), retryReadWriteMessageAttempts) operation := func() error { return wire.WriteMessage(p.writeConn, msg, wire.ProtocolVersion, p.network)