Skip to content

Commit

Permalink
feat: subscribing to libp2p eventbus (#831)
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f authored Dec 1, 2023
1 parent d79fe60 commit e21352d
Show file tree
Hide file tree
Showing 47 changed files with 1,640 additions and 781 deletions.
4 changes: 2 additions & 2 deletions config/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@
## rotate_log_after_days = 1

# `compress` determines if the rotated log files should be compressed.
# Default is `false`.
## compress = false
# Default is `true`.
## compress = true

# `logger.levels` contains the level of logger per module.
# Available log levels are:
Expand Down
19 changes: 17 additions & 2 deletions network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ type EventType int
const (
EventTypeConnect EventType = 1
EventTypeDisconnect EventType = 2
EventTypeGossip EventType = 3
EventTypeStream EventType = 4
EventTypeProtocols EventType = 3
EventTypeGossip EventType = 4
EventTypeStream EventType = 5
)

func (t EventType) String() string {
Expand All @@ -38,6 +39,8 @@ func (t EventType) String() string {
return "connect"
case EventTypeDisconnect:
return "disconnect"
case EventTypeProtocols:
return "protocols"
case EventTypeGossip:
return "gossip-msg"
case EventTypeStream:
Expand Down Expand Up @@ -76,6 +79,7 @@ func (*StreamMessage) Type() EventType {
type ConnectEvent struct {
PeerID lp2pcore.PeerID
RemoteAddress string
Direction string
}

func (*ConnectEvent) Type() EventType {
Expand All @@ -91,6 +95,17 @@ func (*DisconnectEvent) Type() EventType {
return EventTypeDisconnect
}

// ProtocolsEvents represents updating protocols event.
type ProtocolsEvents struct {
PeerID lp2pcore.PeerID
Protocols []string
SupportStream bool
}

func (*ProtocolsEvents) Type() EventType {
return EventTypeProtocols
}

// ShouldPropagate determines whether a message should be disregarded:
// it will be neither delivered to the application nor forwarded to the network.
type ShouldPropagate func(*GossipMessage) bool
Expand Down
6 changes: 5 additions & 1 deletion network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo
n.mdns = newMdnsService(ctx, n.host, n.logger)
}
n.dht = newDHTService(n.ctx, n.host, kadProtocolID, isBootstrapper, conf, n.logger)
n.peerMgr = newPeerMgr(ctx, host, n.dht.kademlia, conf, n.logger)
n.peerMgr = newPeerMgr(ctx, host, conf, n.logger)
n.stream = newStreamService(ctx, n.host, streamProtocolID, n.eventChannel, n.logger)
n.gossip = newGossipService(ctx, n.host, n.eventChannel, isBootstrapper, n.logger)
n.notifee = newNotifeeService(ctx, n.host, n.eventChannel, n.peerMgr, streamProtocolID, isBootstrapper, n.logger)
Expand Down Expand Up @@ -354,9 +354,13 @@ func (n *network) TopicName(topic string) string {
}

func (n *network) CloseConnection(pid lp2ppeer.ID) {
n.logger.Debug("closing connection", "pid", pid)

if err := n.host.Network().ClosePeer(pid); err != nil {
n.logger.Warn("unable to close connection", "peer", pid)
}

n.logger.Debug("connection closed", "pid", pid)
}

func (n *network) String() string {
Expand Down
57 changes: 31 additions & 26 deletions network/notifee.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ type NotifeeService struct {
func newNotifeeService(ctx context.Context, host lp2phost.Host, eventChannel chan<- Event, peerMgr *peerMgr,
protocolID lp2pcore.ProtocolID, bootstrapper bool, log *logger.SubLogger,
) *NotifeeService {
eventSub, err := host.EventBus().Subscribe(lp2pevent.WildcardSubscription)
events := []interface{}{
new(lp2pevent.EvtLocalReachabilityChanged),
new(lp2pevent.EvtPeerIdentificationCompleted),
new(lp2pevent.EvtPeerProtocolsUpdated),
}
eventSub, err := host.EventBus().Subscribe(events)
if err != nil {
logger.Error("failed to register for libp2p events")
}
Expand All @@ -47,30 +52,20 @@ func newNotifeeService(ctx context.Context, host lp2phost.Host, eventChannel cha

func (s *NotifeeService) Start() {
go func() {
defer s.lp2pEventSub.Close()

for {
select {
case evt := <-s.lp2pEventSub.Out():
switch e := evt.(type) {
case lp2pevent.EvtLocalReachabilityChanged:
s.logger.Info("reachability changed", "reachability", e.Reachability)

case lp2pevent.EvtPeerConnectednessChanged:
s.logger.Debug("connectedness changed", "pid", e.Peer, "connectedness", e.Connectedness)
if e.Connectedness == lp2pnetwork.Connected {
s.sendConnectEvent(e.Peer)
} else if e.Connectedness == lp2pnetwork.NotConnected {
s.sendDisconnectEvent(e.Peer)
}

case lp2pevent.EvtPeerIdentificationCompleted:
s.logger.Debug("identification completed", "pid", e.Peer)
s.sendConnectEvent(e.Peer)
s.sendProtocolsEvent(e.Peer)

case lp2pevent.EvtPeerProtocolsUpdated:
s.logger.Debug("protocols updated", "pid", e.Peer, "protocols", e.Added)
s.sendConnectEvent(e.Peer)
s.sendProtocolsEvent(e.Peer)

default:
s.logger.Debug("unhandled libp2p event", "event", evt)
Expand All @@ -84,14 +79,15 @@ func (s *NotifeeService) Start() {
}

func (s *NotifeeService) Stop() {
s.lp2pEventSub.Close()
}

func (s *NotifeeService) Connected(_ lp2pnetwork.Network, conn lp2pnetwork.Conn) {
pid := conn.RemotePeer()
s.logger.Info("connected to peer", "pid", pid, "direction", conn.Stat().Direction)

s.peerMgr.AddPeer(pid, conn.RemoteMultiaddr(), conn.Stat().Direction)
s.sendConnectEvent(pid)
s.sendConnectEvent(pid, conn.RemoteMultiaddr(), conn.Stat().Direction)
}

func (s *NotifeeService) Disconnected(_ lp2pnetwork.Network, conn lp2pnetwork.Conn) {
Expand All @@ -113,20 +109,29 @@ func (s *NotifeeService) ListenClose(_ lp2pnetwork.Network, ma multiaddr.Multiad
s.logger.Debug("notifee ListenClose event emitted", "addr", ma.String())
}

func (s *NotifeeService) sendConnectEvent(pid lp2pcore.PeerID) {
protocols, err := s.host.Peerstore().GetProtocols(pid)
if err != nil {
s.logger.Error("unable to get supported protocols", "pid", pid)
func (s *NotifeeService) sendProtocolsEvent(pid lp2pcore.PeerID) {
protocols, _ := s.host.Peerstore().GetProtocols(pid)
protocolsStr := []string{}
for _, p := range protocols {
protocolsStr = append(protocolsStr, string(p))
}

slices.Sort(protocolsStr)
supportStream := slices.Contains(protocols, s.streamProtocolID)
if supportStream {
addr := s.peerMgr.GetMultiAddr(pid)
if supportStream && addr != nil {
s.eventChannel <- &ConnectEvent{
PeerID: pid,
RemoteAddress: addr.String(),
}
}
s.eventChannel <- &ProtocolsEvents{
PeerID: pid,
Protocols: protocolsStr,
SupportStream: supportStream,
}
}

func (s *NotifeeService) sendConnectEvent(pid lp2pcore.PeerID,
remoteAddress multiaddr.Multiaddr, direction lp2pnetwork.Direction,
) {
s.eventChannel <- &ConnectEvent{
PeerID: pid,
RemoteAddress: remoteAddress.String(),
Direction: direction.String(),
}
}

Expand Down
20 changes: 7 additions & 13 deletions network/peermgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync"
"time"

lp2pdht "github.com/libp2p/go-libp2p-kad-dht"
lp2phost "github.com/libp2p/go-libp2p/core/host"
lp2pnet "github.com/libp2p/go-libp2p/core/network"
lp2ppeer "github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -29,13 +28,12 @@ type peerMgr struct {
minConns int
maxConns int
host lp2phost.Host
dht *lp2pdht.IpfsDHT
peers map[lp2ppeer.ID]*peerInfo
logger *logger.SubLogger
}

// newPeerMgr creates a new Peer Manager instance.
func newPeerMgr(ctx context.Context, h lp2phost.Host, dht *lp2pdht.IpfsDHT,
func newPeerMgr(ctx context.Context, h lp2phost.Host,
conf *Config, log *logger.SubLogger,
) *peerMgr {
b := &peerMgr{
Expand All @@ -45,7 +43,6 @@ func newPeerMgr(ctx context.Context, h lp2phost.Host, dht *lp2pdht.IpfsDHT,
maxConns: conf.MaxConns,
peers: make(map[lp2ppeer.ID]*peerInfo),
host: h,
dht: dht,
logger: log,
}

Expand Down Expand Up @@ -128,9 +125,6 @@ func (mgr *peerMgr) CheckConnectivity() {
connectedness := net.Connectedness(pid)
if connectedness == lp2pnet.Connected {
connectedPeers = append(connectedPeers, pid)
} else {
mgr.logger.Debug("peer is not connected to us", "peer", pid)
delete(mgr.peers, pid)
}
}

Expand All @@ -146,20 +140,20 @@ func (mgr *peerMgr) CheckConnectivity() {
"count", len(connectedPeers),
"min", mgr.minConns)

for _, pi := range mgr.bootstrapAddrs {
mgr.logger.Debug("try connecting to a bootstrap peer", "peer", pi.String())
for _, ai := range mgr.bootstrapAddrs {
mgr.logger.Debug("try connecting to a bootstrap peer", "peer", ai.String())

// Don't try to connect to an already connected peer.
if HasPID(connectedPeers, pi.ID) {
mgr.logger.Trace("already connected", "peer", pi.String())
if HasPID(connectedPeers, ai.ID) {
mgr.logger.Trace("already connected", "peer", ai.String())
continue
}

if swarm, ok := mgr.host.Network().(*lp2pswarm.Swarm); ok {
swarm.Backoff().Clear(pi.ID)
swarm.Backoff().Clear(ai.ID)
}

ConnectAsync(mgr.ctx, mgr.host, pi, mgr.logger)
ConnectAsync(mgr.ctx, mgr.host, ai, mgr.logger)
}
}
}
2 changes: 1 addition & 1 deletion sync/handler_block_announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ func TestBroadcastingBlockAnnounceMessages(t *testing.T) {
msg := message.NewBlockAnnounceMessage(blk, cert)
td.sync.broadcast(msg)

msg1 := td.shouldPublishMessageWithThisType(t, td.network, message.TypeBlockAnnounce)
msg1 := td.shouldPublishMessageWithThisType(t, message.TypeBlockAnnounce)
assert.Equal(t, msg1.Message.(*message.BlockAnnounceMessage).Certificate.Height(), msg.Certificate.Height())
}
40 changes: 19 additions & 21 deletions sync/handler_blocks_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ func (handler *blocksRequestHandler) ParseMessage(m message.Message, pid peer.ID
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
fmt.Sprintf("unknown peer (%s)", pid.String()), msg.SessionID, 0, nil, nil)

return handler.respond(response, pid)
handler.respond(response, pid)
return nil
}

if !p.IsKnownOrTrusty() {
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
fmt.Sprintf("not handshaked (%s)", p.Status.String()), msg.SessionID, 0, nil, nil)

return handler.respond(response, pid)
handler.respond(response, pid)
return nil
}

if !handler.config.NodeNetwork {
Expand All @@ -44,7 +46,8 @@ func (handler *blocksRequestHandler) ParseMessage(m message.Message, pid peer.ID
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
fmt.Sprintf("the request height is not acceptable: %v", msg.From), msg.SessionID, 0, nil, nil)

return handler.respond(response, pid)
handler.respond(response, pid)
return nil
}
}
height := msg.From
Expand All @@ -54,7 +57,8 @@ func (handler *blocksRequestHandler) ParseMessage(m message.Message, pid peer.ID
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
fmt.Sprintf("too many blocks requested: %v-%v", msg.From, msg.Count), msg.SessionID, 0, nil, nil)

return handler.respond(response, pid)
handler.respond(response, pid)
return nil
}

// Help this peer to sync up
Expand All @@ -67,54 +71,48 @@ func (handler *blocksRequestHandler) ParseMessage(m message.Message, pid peer.ID

response := message.NewBlocksResponseMessage(message.ResponseCodeMoreBlocks,
message.ResponseCodeMoreBlocks.String(), msg.SessionID, height, blocksData, nil)
err := handler.respond(response, pid)
if err != nil {
return err
}
handler.respond(response, pid)

height += uint32(len(blocksData))
count -= uint32(len(blocksData))
if count <= 0 {
break
}
}
// To avoid sending blocks again, we update height for this peer
// Height is always greater than zeo.
peerHeight := height - 1

if msg.To() >= handler.state.LastBlockHeight() {
lastCert := handler.state.LastCertificate()
response := message.NewBlocksResponseMessage(message.ResponseCodeSynced,
message.ResponseCodeSynced.String(), msg.SessionID, peerHeight, nil, lastCert)
message.ResponseCodeSynced.String(), msg.SessionID, lastCert.Height(), nil, lastCert)

return handler.respond(response, pid)
handler.respond(response, pid)
return nil
}

response := message.NewBlocksResponseMessage(message.ResponseCodeNoMoreBlocks,
message.ResponseCodeNoMoreBlocks.String(), msg.SessionID, 0, nil, nil)

return handler.respond(response, pid)
handler.respond(response, pid)
return nil
}

func (handler *blocksRequestHandler) PrepareBundle(m message.Message) *bundle.Bundle {
return bundle.NewBundle(m)
}

func (handler *blocksRequestHandler) respond(msg *message.BlocksResponseMessage, to peer.ID) error {
func (handler *blocksRequestHandler) respond(msg *message.BlocksResponseMessage, to peer.ID) {
if msg.ResponseCode == message.ResponseCodeRejected {
handler.logger.Debug("rejecting block request message", "msg", msg,
"to", to, "reason", msg.Reason)

_ = handler.sendTo(msg, to)
handler.sendTo(msg, to)

// There is no point in keeping this stream connection open.
// Close this connection to initiate a new handshake.
handler.network.CloseConnection(to)
} else {
handler.logger.Info("responding block request message", "msg", msg, "to", to)

return nil
handler.sendTo(msg, to)
}

handler.logger.Info("responding block request message", "msg", msg, "to", to)

return handler.sendTo(msg, to)
}
Loading

0 comments on commit e21352d

Please sign in to comment.