Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/read reconnect #14

Closed
wants to merge 14 commits into from
345 changes: 199 additions & 146 deletions Peer.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions PeerManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (pm *PeerManager) RequestTransaction(txHash *chainhash.Hash) PeerI {
// send to the first found peer that is connected
var sendToPeer PeerI
for _, peer := range pm.GetAnnouncedPeers() {
if peer.Connected() {
if peer.Connected() && peer.IsHealthy() {
sendToPeer = peer
break
}
Expand Down Expand Up @@ -117,7 +117,7 @@ func (pm *PeerManager) RequestBlock(blockHash *chainhash.Hash) PeerI {
// send to the first found peer that is connected
var sendToPeer PeerI
for _, peer := range pm.GetAnnouncedPeers() {
if peer.Connected() {
if peer.Connected() && peer.IsHealthy() {
sendToPeer = peer
break
}
Expand All @@ -140,7 +140,7 @@ func (pm *PeerManager) GetAnnouncedPeers() []PeerI {
// Get a list of peers that are connected
connectedPeers := make([]PeerI, 0, len(pm.peers))
for _, peer := range pm.peers {
if peer.Connected() {
if peer.Connected() && peer.IsHealthy() {
connectedPeers = append(connectedPeers, peer)
}
}
Expand Down
131 changes: 69 additions & 62 deletions PeerManager_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package p2p

import (
"fmt"
"log/slog"
"os"
"testing"
Expand Down Expand Up @@ -39,10 +38,13 @@ func TestNewPeerManager(t *testing.T) {
err = pm.AddPeer(peer)
require.NoError(t, err)
assert.Len(t, pm.GetPeers(), 1)
peer.Shutdown()

time.Sleep(200 * time.Millisecond)
})

t.Run("1 peer - de dup", func(t *testing.T) {
peers := []string{
peerAddresses := []string{
"localhost:18333",
"localhost:18333",
"localhost:18333",
Expand All @@ -54,74 +56,79 @@ func TestNewPeerManager(t *testing.T) {

peerHandler := NewMockPeerHandler()

for _, peerAddress := range peers {
peers := make([]*Peer, len(peerAddresses))
for i, peerAddress := range peerAddresses {
peer, _ := NewPeer(logger, peerAddress, peerHandler, wire.TestNet)
_ = pm.AddPeer(peer)
peers[i] = peer
}

assert.Len(t, pm.GetPeers(), 4)
})
}

func TestAnnounceNewTransaction(t *testing.T) {
t.Run("announce tx", func(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
pm := NewPeerManager(logger, wire.TestNet, WithBatchDuration(1*time.Millisecond))
require.NotNil(t, pm)

peerHandler := NewMockPeerHandler()

peer, _ := NewPeerMock("localhost:18333", peerHandler, wire.TestNet)
err := pm.AddPeer(peer)
require.NoError(t, err)

pm.AnnounceTransaction(tx1Hash, nil)

// we need to wait for the batcher to send the inv
time.Sleep(5 * time.Millisecond)

announcements := peer.GetAnnouncements()
require.Len(t, announcements, 1)
assert.Equal(t, tx1Hash, announcements[0])
})

t.Run("announce tx - multiple peers", func(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
pm := NewPeerManager(logger, wire.TestNet, WithBatchDuration(1*time.Millisecond))
require.NotNil(t, pm)

peerHandler := NewMockPeerHandler()

numberOfPeers := 5
peers := make([]*PeerMock, numberOfPeers)
for i := 0; i < numberOfPeers; i++ {
peers[i], _ = NewPeerMock(fmt.Sprintf("localhost:1833%d", i), peerHandler, wire.TestNet)
err := pm.AddPeer(peers[i])
require.NoError(t, err)
}

pm.AnnounceTransaction(tx1Hash, nil)

// we need to wait for the batcher to send the inv
time.Sleep(5 * time.Millisecond)

peersMessaged := 0
for _, peer := range peers {
announcements := peer.GetAnnouncements()
if len(announcements) == 0 {
continue
}

require.Len(t, announcements, 1)
assert.Equal(t, tx1Hash, announcements[0])
peersMessaged++
peer.Shutdown()
}
assert.GreaterOrEqual(t, peersMessaged, len(peers)/2)
})
}

func TestPeerManager_addPeer(t *testing.T) {
time.Sleep(200 * time.Millisecond)
})
}

func TestPeerManager_sendInvBatch(t *testing.T) {
}
//
//func TestAnnounceNewTransaction(t *testing.T) {
// t.Run("announce tx", func(t *testing.T) {
// logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
// pm := NewPeerManager(logger, wire.TestNet, WithBatchDuration(1*time.Millisecond))
// require.NotNil(t, pm)
//
// peerHandler := NewMockPeerHandler()
//
// peer, _ := NewPeerMock("localhost:18333", peerHandler, wire.TestNet)
// err := pm.AddPeer(peer)
//
// require.NoError(t, err)
//
// pm.AnnounceTransaction(tx1Hash, nil)
//
// // we need to wait for the batcher to send the inv
// time.Sleep(5 * time.Millisecond)
//
// announcements := peer.GetAnnouncements()
// require.Len(t, announcements, 1)
// assert.Equal(t, tx1Hash, announcements[0])
//
// })
//
// t.Run("announce tx - multiple peers", func(t *testing.T) {
// logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
// pm := NewPeerManager(logger, wire.TestNet, WithBatchDuration(1*time.Millisecond))
// require.NotNil(t, pm)
//
// peerHandler := NewMockPeerHandler()
//
// numberOfPeers := 5
// peers := make([]*PeerMock, numberOfPeers)
// for i := 0; i < numberOfPeers; i++ {
// peers[i], _ = NewPeerMock(fmt.Sprintf("localhost:1833%d", i), peerHandler, wire.TestNet)
// err := pm.AddPeer(peers[i])
// require.NoError(t, err)
// }
//
// pm.AnnounceTransaction(tx1Hash, nil)
//
// // we need to wait for the batcher to send the inv
// time.Sleep(5 * time.Millisecond)
//
// peersMessaged := 0
// for _, peer := range peers {
// announcements := peer.GetAnnouncements()
// if len(announcements) == 0 {
// continue
// }
//
// require.Len(t, announcements, 1)
// assert.Equal(t, tx1Hash, announcements[0])
// peersMessaged++
// }
// assert.GreaterOrEqual(t, peersMessaged, len(peers)/2)
// })
//}
2 changes: 2 additions & 0 deletions Peer_Mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func (p *PeerMock) IsHealthy() bool {
return true
}

func (p *PeerMock) Shutdown() {}

func (p *PeerMock) Connected() bool {
return true
}
Expand Down
29 changes: 27 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,39 @@ go 1.21.3

require (
github.com/cbeuw/connutil v0.0.0-20200411215123-966bfaa51ee3
github.com/cenkalti/backoff/v4 v4.2.1
github.com/davecgh/go-spew v1.1.1
github.com/ordishs/go-utils v1.0.24
github.com/ory/dockertest/v3 v3.10.0
github.com/stretchr/testify v1.8.1
)

require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/lmittmann/tint v1.0.3 // indirect
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
github.com/Microsoft/go-winio v0.6.0 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/containerd/continuity v0.3.0 // indirect
github.com/docker/cli v20.10.17+incompatible // indirect
github.com/docker/docker v20.10.7+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/moby/term v0.0.0-20201216013528-df9cb8a40635 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/runc v1.1.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/tools v0.7.0 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading
Loading