Skip to content

Commit

Permalink
Merge pull request #7 from libsv/fix/retry-read
Browse files Browse the repository at this point in the history
retry reading message with reconnect
  • Loading branch information
boecklim authored Jan 8, 2024
2 parents 6f6e8ad + 224c38c commit 8e255ec
Showing 1 changed file with 36 additions and 11 deletions.
47 changes: 36 additions & 11 deletions Peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ const (
sentMsg = "Sent"
receivedMsg = "Recv"

retryWriteMessageInterval = 10 * time.Second
retryWriteMessageAttempts = 5
retryReadWriteMessageInterval = 10 * time.Second
retryReadWriteMessageAttempts = 5
)

type Block struct {
Expand Down Expand Up @@ -244,6 +244,37 @@ func (p *Peer) String() string {
return p.address
}

func (p *Peer) readRetry(r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire.Message, error) {
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryReadWriteMessageInterval), retryReadWriteMessageAttempts)
operation := func() (wire.Message, error) {
msg, _, err := wire.ReadMessage(r, pver, bsvnet)
if err != nil {
return nil, err
}
return msg, nil
}

notifyAndReconnect := func(err error, nextTry time.Duration) {
if errors.Is(err, io.EOF) {
p.logger.Error("Failed to read message: EOF", slog.Duration("next try", nextTry), slog.String(errKey, err.Error()))
} else {
p.logger.Error("Failed to read message", slog.Duration("next try", nextTry), slog.String(errKey, err.Error()))
}

err = p.connect()
if err != nil {
p.logger.Error("Failed to reconnect", slog.Duration("next try", nextTry), slog.String(errKey, err.Error()))
}
}

msg, err := backoff.RetryNotifyWithData(operation, policy, notifyAndReconnect)
if err != nil {
return nil, err
}

return msg, nil
}

func (p *Peer) readHandler() {
readConn := p.readConn

Expand All @@ -254,15 +285,9 @@ func (p *Peer) readHandler() {

reader := bufio.NewReader(&io.LimitedReader{R: readConn, N: p.maximumMessageSize})
for {
msg, b, err := wire.ReadMessage(reader, wire.ProtocolVersion, p.network)
msg, err := p.readRetry(reader, wire.ProtocolVersion, p.network)
if err != nil {
if errors.Is(err, io.EOF) {
p.logger.Error("failed to read message: EOF", slog.Int("bytes", len(b)), slog.String("rawMessage", string(b)), slog.String(errKey, err.Error()))
p.disconnect()
break
}

p.logger.Error("failed to read message", slog.Int("bytes", len(b)), slog.String("rawMessage", string(b)), slog.String(errKey, err.Error()))
p.logger.Error("failed to read", slog.String(errKey, err.Error()))
continue
}

Expand Down Expand Up @@ -488,7 +513,7 @@ func (p *Peer) sendDataBatch(batch []*chainhash.Hash) {
}

func (p *Peer) writeRetry(msg wire.Message) error {
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryWriteMessageInterval), retryWriteMessageAttempts)
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryReadWriteMessageInterval), retryReadWriteMessageAttempts)

operation := func() error {
return wire.WriteMessage(p.writeConn, msg, wire.ProtocolVersion, p.network)
Expand Down

0 comments on commit 8e255ec

Please sign in to comment.