Skip to content

Commit

Permalink
Merge branch 'main' into feat/calculating-consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
Ja7ad authored Oct 19, 2024
2 parents db12d97 + 8f6c455 commit c6cffcb
Show file tree
Hide file tree
Showing 32 changed files with 6,176 additions and 4,045 deletions.
16 changes: 8 additions & 8 deletions sync/firewall/firewall.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ func (f *Firewall) OpenStreamBundle(r io.Reader, from peer.ID) (*bundle.Bundle,

func (f *Firewall) openBundle(r io.Reader, from peer.ID) (*bundle.Bundle, error) {
f.peerSet.UpdateLastReceived(from)
f.peerSet.IncreaseReceivedBundlesCounter(from)

p := f.peerSet.GetPeer(from)
if p.Status.IsBanned() {
Expand All @@ -131,31 +130,32 @@ func (f *Firewall) openBundle(r io.Reader, from peer.ID) (*bundle.Bundle, error)
}
}

bdl, err := f.decodeBundle(r, from)
bdl, bytesRead, err := f.decodeBundle(r)
if err != nil {
f.peerSet.IncreaseInvalidBundlesCounter(from)
f.peerSet.UpdateInvalidMetric(from, int64(bytesRead))

return nil, err
}

if err := f.checkBundle(bdl); err != nil {
f.peerSet.IncreaseInvalidBundlesCounter(from)
f.peerSet.UpdateInvalidMetric(from, int64(bytesRead))

return bdl, err
}

f.peerSet.UpdateReceivedMetric(from, bdl.Message.Type(), int64(bytesRead))

return bdl, nil
}

func (f *Firewall) decodeBundle(r io.Reader, pid peer.ID) (*bundle.Bundle, error) {
func (*Firewall) decodeBundle(r io.Reader) (*bundle.Bundle, int, error) {
bdl := new(bundle.Bundle)
bytesRead, err := bdl.Decode(r)
if err != nil {
return nil, err
return nil, bytesRead, err
}
f.peerSet.IncreaseReceivedBytesCounter(pid, bdl.Message.Type(), int64(bytesRead))

return bdl, nil
return bdl, bytesRead, nil
}

func (f *Firewall) checkBundle(bdl *bundle.Bundle) error {
Expand Down
4 changes: 2 additions & 2 deletions sync/firewall/firewall_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ func TestDecodeBundles(t *testing.T) {
}

p := td.firewall.peerSet.GetPeer(td.unknownPeerID)
assert.Equal(t, 5, p.ReceivedBundles)
assert.Equal(t, 4, p.InvalidBundles)
assert.Equal(t, int64(1), p.Metric.TotalReceived.Bundles)
assert.Equal(t, int64(4), p.Metric.TotalInvalid.Bundles)
}

func TestGossipMessage(t *testing.T) {
Expand Down
54 changes: 54 additions & 0 deletions sync/peerset/peer/metric/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package metric

import "github.com/pactus-project/pactus/sync/bundle/message"

type Counter struct {
Bytes int64
Bundles int64
}

type Metric struct {
TotalInvalid Counter
TotalSent Counter
TotalReceived Counter
MessageSent map[message.Type]*Counter
MessageReceived map[message.Type]*Counter
}

func NewMetric() Metric {
return Metric{
MessageSent: make(map[message.Type]*Counter),
MessageReceived: make(map[message.Type]*Counter),
}
}

func (m *Metric) UpdateSentMetric(msgType message.Type, bytes int64) {
m.TotalSent.Bundles++
m.TotalSent.Bytes += bytes

_, ok := m.MessageSent[msgType]
if !ok {
m.MessageSent[msgType] = &Counter{}
}

m.MessageSent[msgType].Bundles++
m.MessageSent[msgType].Bytes += bytes
}

func (m *Metric) UpdateReceivedMetric(msgType message.Type, bytes int64) {
m.TotalReceived.Bundles++
m.TotalReceived.Bytes += bytes

_, ok := m.MessageReceived[msgType]
if !ok {
m.MessageReceived[msgType] = &Counter{}
}

m.MessageReceived[msgType].Bundles++
m.MessageReceived[msgType].Bytes += bytes
}

func (m *Metric) UpdateInvalidMetric(bytes int64) {
m.TotalInvalid.Bundles++
m.TotalInvalid.Bytes += bytes
}
46 changes: 46 additions & 0 deletions sync/peerset/peer/metric/metric_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package metric

import (
"testing"

"github.com/pactus-project/pactus/sync/bundle/message"
"github.com/stretchr/testify/assert"
)

func TestUpdateSentMetric(t *testing.T) {
metric := NewMetric()

testMsgType := message.Type(1)

metric.UpdateSentMetric(testMsgType, 100)

assert.Equal(t, int64(1), metric.TotalSent.Bundles)
assert.Equal(t, int64(100), metric.TotalSent.Bytes)

assert.NotNil(t, metric.MessageSent[testMsgType])
assert.Equal(t, int64(1), metric.MessageSent[testMsgType].Bundles)
assert.Equal(t, int64(100), metric.MessageSent[testMsgType].Bytes)
}

func TestUpdateReceivedMetric(t *testing.T) {
metric := NewMetric()

testMsgType := message.Type(2)

metric.UpdateReceivedMetric(testMsgType, 200)

assert.Equal(t, int64(1), metric.TotalReceived.Bundles)
assert.Equal(t, int64(200), metric.TotalReceived.Bytes)

assert.NotNil(t, metric.MessageReceived[testMsgType])
assert.Equal(t, int64(1), metric.MessageReceived[testMsgType].Bundles)
assert.Equal(t, int64(200), metric.MessageReceived[testMsgType].Bytes)
}

func TestUpdateInvalidMetric(t *testing.T) {
metric := NewMetric()

metric.UpdateInvalidMetric(123)
assert.Equal(t, int64(1), metric.TotalInvalid.Bundles)
assert.Equal(t, int64(123), metric.TotalInvalid.Bytes)
}
10 changes: 3 additions & 7 deletions sync/peerset/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
lp2ppeer "github.com/libp2p/go-libp2p/core/peer"
"github.com/pactus-project/pactus/crypto/bls"
"github.com/pactus-project/pactus/crypto/hash"
"github.com/pactus-project/pactus/sync/bundle/message"
"github.com/pactus-project/pactus/sync/peerset/peer/metric"
"github.com/pactus-project/pactus/sync/peerset/peer/service"
"github.com/pactus-project/pactus/sync/peerset/peer/status"
)
Expand All @@ -27,21 +27,17 @@ type Peer struct {
LastReceived time.Time
LastBlockHash hash.Hash
Height uint32
ReceivedBundles int
InvalidBundles int
TotalSessions int
CompletedSessions int
ReceivedBytes map[message.Type]int64
SentBytes map[message.Type]int64
Metric metric.Metric
}

func NewPeer(peerID ID) *Peer {
return &Peer{
ConsensusKeys: make([]*bls.PublicKey, 0),
Status: status.StatusUnknown,
PeerID: peerID,
ReceivedBytes: make(map[message.Type]int64),
SentBytes: make(map[message.Type]int64),
Metric: metric.NewMetric(),
Protocols: make([]string, 0),
}
}
Expand Down
99 changes: 24 additions & 75 deletions sync/peerset/peer_set.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package peerset

import (
"maps"
"sync"
"time"

"github.com/pactus-project/pactus/crypto/bls"
"github.com/pactus-project/pactus/crypto/hash"
"github.com/pactus-project/pactus/sync/bundle/message"
"github.com/pactus-project/pactus/sync/peerset/peer"
"github.com/pactus-project/pactus/sync/peerset/peer/metric"
"github.com/pactus-project/pactus/sync/peerset/peer/service"
"github.com/pactus-project/pactus/sync/peerset/peer/status"
"github.com/pactus-project/pactus/sync/peerset/session"
Expand All @@ -18,23 +18,18 @@ import (
type PeerSet struct {
lk sync.RWMutex

peers map[peer.ID]*peer.Peer
sessionManager *session.Manager
totalSentBundles int
totalSentBytes int64
totalReceivedBytes int64
sentBytes map[message.Type]int64
receivedBytes map[message.Type]int64
startedAt time.Time
peers map[peer.ID]*peer.Peer
sessionManager *session.Manager
startedAt time.Time
metric metric.Metric
}

// NewPeerSet constructs a new PeerSet for managing peer information.
func NewPeerSet(sessionTimeout time.Duration) *PeerSet {
return &PeerSet{
peers: make(map[peer.ID]*peer.Peer),
sessionManager: session.NewManager(sessionTimeout),
sentBytes: make(map[message.Type]int64),
receivedBytes: make(map[message.Type]int64),
metric: metric.NewMetric(),
startedAt: time.Now(),
}
}
Expand Down Expand Up @@ -257,96 +252,43 @@ func (ps *PeerSet) UpdateLastReceived(pid peer.ID) {
p.LastReceived = time.Now()
}

func (ps *PeerSet) IncreaseReceivedBundlesCounter(pid peer.ID) {
func (ps *PeerSet) UpdateInvalidMetric(pid peer.ID, bytes int64) {
ps.lk.Lock()
defer ps.lk.Unlock()

p := ps.findOrCreatePeer(pid)
p.ReceivedBundles++
}

func (ps *PeerSet) IncreaseInvalidBundlesCounter(pid peer.ID) {
ps.lk.Lock()
defer ps.lk.Unlock()
ps.metric.UpdateInvalidMetric(bytes)

p := ps.findOrCreatePeer(pid)
p.InvalidBundles++
p.Metric.UpdateInvalidMetric(bytes)
}

func (ps *PeerSet) IncreaseReceivedBytesCounter(pid peer.ID, msgType message.Type, c int64) {
func (ps *PeerSet) UpdateReceivedMetric(pid peer.ID, msgType message.Type, bytes int64) {
ps.lk.Lock()
defer ps.lk.Unlock()

p := ps.findOrCreatePeer(pid)
p.ReceivedBytes[msgType] += c
ps.metric.UpdateReceivedMetric(msgType, bytes)

ps.totalReceivedBytes += c
ps.receivedBytes[msgType] += c
p := ps.findOrCreatePeer(pid)
p.Metric.UpdateReceivedMetric(msgType, bytes)
}

func (ps *PeerSet) IncreaseSentCounters(msgType message.Type, c int64, pid *peer.ID) {
func (ps *PeerSet) UpdateSentMetric(pid *peer.ID, msgType message.Type, bytes int64) {
ps.lk.Lock()
defer ps.lk.Unlock()

ps.totalSentBundles++
ps.totalSentBytes += c
ps.sentBytes[msgType] += c
ps.metric.UpdateSentMetric(msgType, bytes)

if pid != nil {
p := ps.findOrCreatePeer(*pid)
p.SentBytes[msgType] += c
p.Metric.UpdateSentMetric(msgType, bytes)
}
}

func (ps *PeerSet) TotalSentBundles() int {
ps.lk.RLock()
defer ps.lk.RUnlock()

return ps.totalSentBundles
}

func (ps *PeerSet) TotalSentBytes() int64 {
ps.lk.RLock()
defer ps.lk.RUnlock()

return ps.totalSentBytes
}

func (ps *PeerSet) TotalReceivedBytes() int64 {
ps.lk.RLock()
defer ps.lk.RUnlock()

return ps.totalReceivedBytes
}

func (ps *PeerSet) SentBytesMessageType(msgType message.Type) int64 {
if sentBytes, ok := ps.sentBytes[msgType]; ok {
return sentBytes
}

return 0
}

func (ps *PeerSet) ReceivedBytesMessageType(msgType message.Type) int64 {
if receivedBytes, ok := ps.receivedBytes[msgType]; ok {
return receivedBytes
}

return 0
}

func (ps *PeerSet) SentBytes() map[message.Type]int64 {
ps.lk.RLock()
defer ps.lk.RUnlock()

return maps.Clone(ps.sentBytes)
}

func (ps *PeerSet) ReceivedBytes() map[message.Type]int64 {
ps.lk.RLock()
defer ps.lk.RUnlock()

return maps.Clone(ps.receivedBytes)
return int(ps.metric.TotalSent.Bundles)
}

func (ps *PeerSet) StartedAt() time.Time {
Expand Down Expand Up @@ -375,6 +317,13 @@ func (ps *PeerSet) Sessions() []*session.Session {
return ps.sessionManager.Sessions()
}

func (ps *PeerSet) Metric() metric.Metric {
ps.lk.RLock()
defer ps.lk.RUnlock()

return ps.metric
}

// GetRandomPeer selects a random peer from the peer set based on their download score.
// Peers with higher score are more likely to be selected.
func (ps *PeerSet) GetRandomPeer() *peer.Peer {
Expand Down
Loading

0 comments on commit c6cffcb

Please sign in to comment.